You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/12/14 15:27:19 UTC

[arrow] branch master updated: ARROW-4015: [Plasma] remove unused interfaces for plasma manager

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c41303  ARROW-4015: [Plasma] remove unused interfaces for plasma manager
8c41303 is described below

commit 8c413036775796d9bcc52be56373bbb45de8c0ae
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Fri Dec 14 07:27:08 2018 -0800

    ARROW-4015: [Plasma] remove unused interfaces for plasma manager
    
    https://github.com/apache/arrow/issues/3154
    
    This removes unused plasma interfaces Fetch(), Wait(), Transfer() and Info(), which depend on plasma manager which has already been removed from ray.
    
    Author: Philipp Moritz <pc...@gmail.com>
    Author: Zhijun Fu <pi...@antfin.com>
    Author: Robert Nishihara <ro...@gmail.com>
    
    Closes #3167 from zhijunfu/remove-legacy-interfaces and squashes the following commits:
    
    0efb5005f <Philipp Moritz> fix tensorflow op
    be92e9085 <Philipp Moritz> fix java client
    9da2cd38b <Philipp Moritz> Update _plasma.pyx
    16ec63e9a <Robert Nishihara> More updates
    e7413f739 <Robert Nishihara> Update _plasma.pyx
    21398b5e7 <Zhijun Fu> merge
    bcb320400 <Zhijun Fu> address comments
    7967aea09 <Philipp Moritz> Merge branch 'master' into remove-legacy-interfaces
    583cd97c4 <Zhijun Fu> ARROW-4015:  remove unused interfaces for plasma manager
---
 c_glib/plasma-glib/client.cpp                      |   3 +-
 cpp/apidoc/tutorials/plasma.md                     |   8 +-
 cpp/apidoc/tutorials/tensor_to_py.md               |   2 +-
 cpp/src/plasma/client.cc                           | 111 +---------------
 cpp/src/plasma/client.h                            | 100 +-------------
 cpp/src/plasma/common.cc                           |   3 -
 cpp/src/plasma/common.h                            |  24 ----
 cpp/src/plasma/format/plasma.fbs                   |  74 -----------
 .../org_apache_arrow_plasma_PlasmaClientJNI.cc     |  73 -----------
 cpp/src/plasma/plasma.h                            |   3 -
 cpp/src/plasma/protocol.cc                         | 143 ---------------------
 cpp/src/plasma/protocol.h                          |  35 -----
 cpp/src/plasma/test/client_tests.cc                |   2 -
 cpp/src/plasma/test/serialization_tests.cc         | 116 -----------------
 docs/source/python/plasma.rst                      |  10 +-
 .../org/apache/arrow/plasma/ObjectStoreLink.java   |  27 ----
 .../java/org/apache/arrow/plasma/PlasmaClient.java |  23 ----
 python/benchmarks/plasma.py                        |   4 +-
 python/examples/plasma/sorting/sort_df.py          |   2 +-
 python/pyarrow/_plasma.pyx                         | 130 +------------------
 python/pyarrow/tensorflow/plasma_op.cc             |  18 +--
 python/pyarrow/tests/test_plasma.py                |  16 +--
 python/pyarrow/tests/test_plasma_tf_op.py          |   8 +-
 23 files changed, 41 insertions(+), 894 deletions(-)

diff --git a/c_glib/plasma-glib/client.cpp b/c_glib/plasma-glib/client.cpp
index c05a710..9591a0a 100644
--- a/c_glib/plasma-glib/client.cpp
+++ b/c_glib/plasma-glib/client.cpp
@@ -41,8 +41,7 @@ G_BEGIN_DECLS
  *
  * #GPlasmaClientCreateOptions is a class for customizing object creation.
  *
- * #GPlasmaClient is a class for an interface with a plasma store
- * and a plasma manager.
+ * #GPlasmaClient is a class for an interface with a plasma store.
  *
  * Since: 0.12.0
  */
diff --git a/cpp/apidoc/tutorials/plasma.md b/cpp/apidoc/tutorials/plasma.md
index 472d479..b9046d5 100644
--- a/cpp/apidoc/tutorials/plasma.md
+++ b/cpp/apidoc/tutorials/plasma.md
@@ -80,7 +80,7 @@ using namespace plasma;
 int main(int argc, char** argv) {
   // Start up and connect a Plasma client.
   PlasmaClient client;
-  ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
+  ARROW_CHECK_OK(client.Connect("/tmp/plasma"));
   // Disconnect the Plasma client.
   ARROW_CHECK_OK(client.Disconnect());
 }
@@ -226,7 +226,7 @@ using namespace plasma;
 int main(int argc, char** argv) {
   // Start up and connect a Plasma client.
   PlasmaClient client;
-  ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
+  ARROW_CHECK_OK(client.Connect("/tmp/plasma"));
   // Create an object with a fixed ObjectID.
   ObjectID object_id = ObjectID::from_binary("00000000000000000000");
   int64_t data_size = 1000;
@@ -332,7 +332,7 @@ using namespace plasma;
 int main(int argc, char** argv) {
   // Start up and connect a Plasma client.
   PlasmaClient client;
-  ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
+  ARROW_CHECK_OK(client.Connect("/tmp/plasma"));
   ObjectID object_id = ObjectID::from_binary("00000000000000000000");
   ObjectBuffer object_buffer;
   ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer));
@@ -421,7 +421,7 @@ using namespace plasma;
 int main(int argc, char** argv) {
   // Start up and connect a Plasma client.
   PlasmaClient client;
-  ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
+  ARROW_CHECK_OK(client.Connect("/tmp/plasma"));
 
   int fd;
   ARROW_CHECK_OK(client.Subscribe(&fd));
diff --git a/cpp/apidoc/tutorials/tensor_to_py.md b/cpp/apidoc/tutorials/tensor_to_py.md
index 0be973a..cd191fe 100644
--- a/cpp/apidoc/tutorials/tensor_to_py.md
+++ b/cpp/apidoc/tutorials/tensor_to_py.md
@@ -105,7 +105,7 @@ The `inputs` variable will be a list of Object IDs in their raw byte string form
 import pyarrow as pa
 import pyarrow.plasma as plasma
 
-plasma_client = plasma.connect('/tmp/plasma', '', 0)
+plasma_client = plasma.connect('/tmp/plasma')
 
 # inputs: a list of object ids
 inputs = [20 * b'1']
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 2dbe2b4..4215399 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -198,17 +198,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
 
   Status Disconnect();
 
-  Status Fetch(int num_object_ids, const ObjectID* object_ids);
-
-  Status Wait(int64_t num_object_requests, ObjectRequest* object_requests,
-              int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
-
-  Status Transfer(const char* addr, int port, const ObjectID& object_id);
-
-  Status Info(const ObjectID& object_id, int* object_status);
-
-  int get_manager_fd() const;
-
   bool IsInUse(const ObjectID& object_id);
 
  private:
@@ -250,8 +239,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
 
   /// File descriptor of the Unix domain socket that connects to the store.
   int store_conn_;
-  /// File descriptor of the Unix domain socket that connects to the manager.
-  int manager_conn_;
   /// Table of dlmalloc buffer files that have been memory mapped so far. This
   /// is a hash table mapping a file descriptor to a struct containing the
   /// address of the corresponding memory-mapped file.
@@ -872,10 +859,7 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
                                    int release_delay, int num_retries) {
   RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
   if (manager_socket_name != "") {
-    RETURN_NOT_OK(
-        ConnectIpcSocketRetry(manager_socket_name, num_retries, -1, &manager_conn_));
-  } else {
-    manager_conn_ = -1;
+    return Status::NotImplemented("plasma manager is no longer supported");
   }
   if (release_delay != 0) {
     ARROW_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect "
@@ -898,78 +882,6 @@ Status PlasmaClient::Impl::Disconnect() {
   // that were in use by us when handling the SIGPIPE.
   close(store_conn_);
   store_conn_ = -1;
-  if (manager_conn_ >= 0) {
-    close(manager_conn_);
-    manager_conn_ = -1;
-  }
-  return Status::OK();
-}
-
-Status PlasmaClient::Impl::Transfer(const char* address, int port,
-                                    const ObjectID& object_id) {
-  return SendDataRequest(manager_conn_, object_id, address, port);
-}
-
-Status PlasmaClient::Impl::Fetch(int num_object_ids, const ObjectID* object_ids) {
-  ARROW_CHECK(manager_conn_ >= 0);
-  return SendFetchRequest(manager_conn_, object_ids, num_object_ids);
-}
-
-int PlasmaClient::Impl::get_manager_fd() const { return manager_conn_; }
-
-Status PlasmaClient::Impl::Info(const ObjectID& object_id, int* object_status) {
-  ARROW_CHECK(manager_conn_ >= 0);
-
-  RETURN_NOT_OK(SendStatusRequest(manager_conn_, &object_id, 1));
-  std::vector<uint8_t> buffer;
-  RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType::PlasmaStatusReply, &buffer));
-  ObjectID id;
-  RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1));
-  ARROW_CHECK(object_id == id);
-  return Status::OK();
-}
-
-Status PlasmaClient::Impl::Wait(int64_t num_object_requests,
-                                ObjectRequest* object_requests, int num_ready_objects,
-                                int64_t timeout_ms, int* num_objects_ready) {
-  ARROW_CHECK(manager_conn_ >= 0);
-  ARROW_CHECK(num_object_requests > 0);
-  ARROW_CHECK(num_ready_objects > 0);
-  ARROW_CHECK(num_ready_objects <= num_object_requests);
-
-  for (int i = 0; i < num_object_requests; ++i) {
-    ARROW_CHECK(object_requests[i].type == ObjectRequestType::PLASMA_QUERY_LOCAL ||
-                object_requests[i].type == ObjectRequestType::PLASMA_QUERY_ANYWHERE);
-  }
-
-  RETURN_NOT_OK(SendWaitRequest(manager_conn_, object_requests, num_object_requests,
-                                num_ready_objects, timeout_ms));
-  std::vector<uint8_t> buffer;
-  RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType::PlasmaWaitReply, &buffer));
-  RETURN_NOT_OK(
-      ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects));
-
-  *num_objects_ready = 0;
-  for (int i = 0; i < num_object_requests; ++i) {
-    ObjectRequestType type = object_requests[i].type;
-    auto status = static_cast<fb::ObjectStatus>(object_requests[i].location);
-    switch (type) {
-      case ObjectRequestType::PLASMA_QUERY_LOCAL:
-        if (status == fb::ObjectStatus::Local) {
-          *num_objects_ready += 1;
-        }
-        break;
-      case ObjectRequestType::PLASMA_QUERY_ANYWHERE:
-        if (status == fb::ObjectStatus::Local || status == fb::ObjectStatus::Remote) {
-          *num_objects_ready += 1;
-        } else {
-          ARROW_CHECK(status == fb::ObjectStatus::Nonexistent);
-        }
-        break;
-      default:
-        ARROW_LOG(FATAL) << "This code should be unreachable.";
-    }
-  }
   return Status::OK();
 }
 
@@ -1052,27 +964,6 @@ Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_
 
 Status PlasmaClient::Disconnect() { return impl_->Disconnect(); }
 
-Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) {
-  return impl_->Fetch(num_object_ids, object_ids);
-}
-
-Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_requests,
-                          int num_ready_objects, int64_t timeout_ms,
-                          int* num_objects_ready) {
-  return impl_->Wait(num_object_requests, object_requests, num_ready_objects, timeout_ms,
-                     num_objects_ready);
-}
-
-Status PlasmaClient::Transfer(const char* addr, int port, const ObjectID& object_id) {
-  return impl_->Transfer(addr, port, object_id);
-}
-
-Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
-  return impl_->Info(object_id, object_status);
-}
-
-int PlasmaClient::get_manager_fd() const { return impl_->get_manager_fd(); }
-
 bool PlasmaClient::IsInUse(const ObjectID& object_id) {
   return impl_->IsInUse(object_id);
 }
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 514d2bd..ac9e8eb 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -49,19 +49,20 @@ class ARROW_EXPORT PlasmaClient {
   PlasmaClient();
   ~PlasmaClient();
 
-  /// Connect to the local plasma store and plasma manager. Return
-  /// the resulting connection.
+  /// Connect to the local plasma store. Return the resulting connection.
   ///
   /// \param store_socket_name The name of the UNIX domain socket to use to
   ///        connect to the Plasma store.
   /// \param manager_socket_name The name of the UNIX domain socket to use to
   ///        connect to the local Plasma manager. If this is "", then this
   ///        function will not connect to a manager.
+  ///        Note that plasma manager is no longer supported, this function
+  ///        will return failure if this is not "".
   /// \param release_delay Deprecated (not used).
   /// \param num_retries number of attempts to connect to IPC socket, default 50
   /// \return The return status.
   Status Connect(const std::string& store_socket_name,
-                 const std::string& manager_socket_name, int release_delay = 0,
+                 const std::string& manager_socket_name = "", int release_delay = 0,
                  int num_retries = -1);
 
   /// Create an object in the Plasma Store. Any metadata for this object must be
@@ -249,99 +250,6 @@ class ARROW_EXPORT PlasmaClient {
   /// \return The return status.
   Status Disconnect();
 
-  /// Attempt to initiate the transfer of some objects from remote Plasma
-  /// Stores.
-  /// This method does not guarantee that the fetched objects will arrive
-  /// locally.
-  ///
-  /// For an object that is available in the local Plasma Store, this method
-  /// will
-  /// not do anything. For an object that is not available locally, it will
-  /// check
-  /// if the object are already being fetched. If so, it will not do anything.
-  /// If
-  /// not, it will query the object table for a list of Plasma Managers that
-  /// have
-  /// the object. The object table will return a non-empty list, and this Plasma
-  /// Manager will attempt to initiate transfers from one of those Plasma
-  /// Managers.
-  ///
-  /// This function is non-blocking.
-  ///
-  /// This method is idempotent in the sense that it is ok to call it multiple
-  /// times.
-  ///
-  /// \param num_object_ids The number of object IDs fetch is being called on.
-  /// \param object_ids The IDs of the objects that fetch is being called on.
-  /// \return The return status.
-  Status Fetch(int num_object_ids, const ObjectID* object_ids);
-
-  /// Wait for (1) a specified number of objects to be available (sealed) in the
-  /// local Plasma Store or in a remote Plasma Store, or (2) for a timeout to
-  /// expire. This is a blocking call.
-  ///
-  /// \param num_object_requests Size of the object_requests array.
-  /// \param object_requests Object event array. Each element contains a request
-  ///        for a particular object_id. The type of request is specified in the
-  ///        "type" field.
-  ///        - A PLASMA_QUERY_LOCAL request is satisfied when object_id becomes
-  ///          available in the local Plasma Store. In this case, this function
-  ///          sets the "status" field to ObjectStatus::Local. Note, if the
-  ///          status
-  ///          is not ObjectStatus::Local, it will be ObjectStatus::Nonexistent,
-  ///          but it may exist elsewhere in the system.
-  ///        - A PLASMA_QUERY_ANYWHERE request is satisfied when object_id
-  ///        becomes
-  ///          available either at the local Plasma Store or on a remote Plasma
-  ///          Store. In this case, the functions sets the "status" field to
-  ///          ObjectStatus::Local or ObjectStatus::Remote.
-  /// \param num_ready_objects The number of requests in object_requests array
-  /// that
-  ///        must be satisfied before the function returns, unless it timeouts.
-  ///        The num_ready_objects should be no larger than num_object_requests.
-  /// \param timeout_ms Timeout value in milliseconds. If this timeout expires
-  ///        before min_num_ready_objects of requests are satisfied, the
-  ///        function
-  ///        returns.
-  /// \param num_objects_ready Out parameter for number of satisfied requests in
-  ///        the object_requests list. If the returned number is less than
-  ///        min_num_ready_objects this means that timeout expired.
-  /// \return The return status.
-  Status Wait(int64_t num_object_requests, ObjectRequest* object_requests,
-              int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
-
-  /// Transfer local object to a different plasma manager.
-  ///
-  /// \param addr IP address of the plasma manager we are transfering to.
-  /// \param port Port of the plasma manager we are transfering to.
-  /// \param object_id ObjectID of the object we are transfering.
-  /// \return The return status.
-  Status Transfer(const char* addr, int port, const ObjectID& object_id);
-
-  /// Return the status of a given object. This method may query the object
-  /// table.
-  ///
-  /// \param object_id The ID of the object whose status we query.
-  /// \param object_status Out parameter for object status. Can take the
-  ///         following values.
-  ///         - PLASMA_CLIENT_LOCAL, if object is stored in the local Plasma
-  ///         Store.
-  ///           has been already scheduled by the Plasma Manager.
-  ///         - PLASMA_CLIENT_TRANSFER, if the object is either currently being
-  ///           transferred or just scheduled.
-  ///         - PLASMA_CLIENT_REMOTE, if the object is stored at a remote
-  ///           Plasma Store.
-  ///         - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in the
-  ///           system.
-  /// \return The return status.
-  Status Info(const ObjectID& object_id, int* object_status);
-
-  /// Get the file descriptor for the socket connection to the plasma manager.
-  ///
-  /// \return The file descriptor for the manager connection. If there is no
-  ///         connection to the manager, this is -1.
-  int get_manager_fd() const;
-
  private:
   friend class PlasmaBuffer;
   FRIEND_TEST(TestPlasmaStore, GetTest);
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index 0ca17cf..1b86fd8 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -107,9 +107,6 @@ bool UniqueID::operator==(const UniqueID& rhs) const {
   return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
 }
 
-ARROW_EXPORT fb::ObjectStatus ObjectStatusLocal = fb::ObjectStatus::Local;
-ARROW_EXPORT fb::ObjectStatus ObjectStatusRemote = fb::ObjectStatus::Remote;
-
 const PlasmaStoreInfo* plasma_config;
 
 }  // namespace plasma
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 7090428..38925fe 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -66,30 +66,6 @@ typedef UniqueID ObjectID;
 /// Size of object hash digests.
 constexpr int64_t kDigestSize = sizeof(uint64_t);
 
-enum class ObjectRequestType : int {
-  /// Query for object in the local plasma store.
-  PLASMA_QUERY_LOCAL = 1,
-  /// Query for object in the local plasma store or in a remote plasma store.
-  PLASMA_QUERY_ANYWHERE
-};
-
-/// Object request data structure. Used for Wait.
-struct ObjectRequest {
-  /// The ID of the requested object. If ID_NIL request any object.
-  ObjectID object_id;
-  /// Request associated to the object. It can take one of the following values:
-  ///  - PLASMA_QUERY_LOCAL: return if or when the object is available in the
-  ///    local Plasma Store.
-  ///  - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
-  ///    the system (i.e., either in the local or a remote Plasma Store).
-  ObjectRequestType type;
-  /// Object location. This can be
-  ///  - ObjectLocation::Local: object is ready at the local Plasma Store.
-  ///  - ObjectLocation::Remote: object is ready at a remote Plasma Store.
-  ///  - ObjectLocation::Nonexistent: object does not exist in the system.
-  ObjectLocation location;
-};
-
 enum class ObjectState : int {
   /// Object was created but not sealed in the local Plasma Store.
   PLASMA_CREATED = 1,
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index ef934fb..b3c8903 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -42,9 +42,6 @@ enum MessageType:long {
   // Delete an object.
   PlasmaDeleteRequest,
   PlasmaDeleteReply,
-  // Get status of an object.
-  PlasmaStatusRequest,
-  PlasmaStatusReply,
   // See if the store contains an object (will be deprecated).
   PlasmaContainsRequest,
   PlasmaContainsReply,
@@ -57,11 +54,6 @@ enum MessageType:long {
   // Make room for new objects in the plasma store.
   PlasmaEvictRequest,
   PlasmaEvictReply,
-  // Fetch objects from remote Plasma stores.
-  PlasmaFetchRequest,
-  // Wait for objects to be ready either from local or remote Plasma stores.
-  PlasmaWaitRequest,
-  PlasmaWaitReply,
   // Subscribe to a list of objects or to all objects.
   PlasmaSubscribeRequest,
   // Unsubscribe.
@@ -239,35 +231,6 @@ table PlasmaDeleteReply {
   errors: [PlasmaError];
 }
 
-table PlasmaStatusRequest {
-  // IDs of the objects stored at local Plasma store we request the status of.
-  object_ids: [string];
-}
-
-enum ObjectStatus:int {
-  // Object is stored in the local Plasma Store.
-  Local,
-  // Object is stored on a remote Plasma store, and it is not stored on the
-  // local Plasma Store.
-  Remote,
-  // Object is not stored in the system.
-  Nonexistent,
-  // Object is currently transferred from a remote Plasma store the local
-  // Plasma Store.
-  Transfer
-}
-
-table PlasmaStatusReply {
-  // IDs of the objects being returned.
-  object_ids: [string];
-  // Status of the object.
-  status: [ObjectStatus];
-}
-
-// PlasmaContains is a subset of PlasmaStatus which does not
-// involve the plasma manager, only the store. We should consider
-// unifying them in the future and deprecating PlasmaContains.
-
 table PlasmaContainsRequest {
   // ID of the object we are querying.
   object_id: string;
@@ -309,43 +272,6 @@ table PlasmaEvictReply {
   num_bytes: ulong;
 }
 
-table PlasmaFetchRequest {
-  // IDs of objects to be gotten.
-  object_ids: [string];
-}
-
-table ObjectRequestSpec {
-  // ID of the object.
-  object_id: string;
-  // The type of the object. This specifies whether we
-  // will be waiting for an object store in the local or
-  // global Plasma store.
-  type: int;
-}
-
-table PlasmaWaitRequest {
-  // Array of object requests whose status we are asking for.
-  object_requests: [ObjectRequestSpec];
-  // Number of objects expected to be returned, if available.
-  num_ready_objects: int;
-  // timeout
-  timeout: long;
-}
-
-table ObjectReply {
-  // ID of the object.
-  object_id: string;
-  // The object status. This specifies where the object is stored.
-  status: ObjectStatus;
-}
-
-table PlasmaWaitReply {
-  // Array of object requests being returned.
-  object_requests: [ObjectReply];
-  // Number of objects expected to be returned, if available.
-  num_ready_objects: int;
-}
-
 table PlasmaSubscribeRequest {
 }
 
diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
index 7cd2f35..fa376ec 100644
--- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
+++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
@@ -220,79 +220,6 @@ JNIEXPORT jboolean JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_contains
   return has_object;
 }
 
-JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_fetch(
-    JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids) {
-  plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);
-  jsize num_oids = env->GetArrayLength(object_ids);
-
-  std::vector<plasma::ObjectID> oids(num_oids);
-  for (int i = 0; i < num_oids; ++i) {
-    jbyteArray_to_object_id(
-        env, reinterpret_cast<jbyteArray>(env->GetObjectArrayElement(object_ids, i)),
-        &oids[i]);
-  }
-
-  ARROW_CHECK_OK(client->Fetch(static_cast<int>(num_oids), oids.data()));
-
-  return;
-}
-
-JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_wait(
-    JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids, jint timeout_ms,
-    jint num_returns) {
-  plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);
-  jsize num_oids = env->GetArrayLength(object_ids);
-
-  if (num_returns < 0) {
-    jclass Exception = env->FindClass("java/lang/RuntimeException");
-    env->ThrowNew(Exception, "The argument num_returns cannot be less than zero.");
-    return nullptr;
-  }
-  if (num_returns > num_oids) {
-    jclass Exception = env->FindClass("java/lang/RuntimeException");
-    env->ThrowNew(Exception,
-                  "The argument num_returns cannot be greater than len(object_ids).");
-    return nullptr;
-  }
-
-  std::vector<plasma::ObjectRequest> oreqs(num_oids);
-
-  for (int i = 0; i < num_oids; ++i) {
-    jbyteArray_to_object_id(
-        env, reinterpret_cast<jbyteArray>(env->GetObjectArrayElement(object_ids, i)),
-        &oreqs[i].object_id);
-    oreqs[i].type = plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE;
-  }
-
-  int num_return_objects;
-  // TODO: may be blocked. consider to add the thread support
-  ARROW_CHECK_OK(client->Wait(static_cast<int>(num_oids), oreqs.data(), num_returns,
-                              static_cast<uint64_t>(timeout_ms), &num_return_objects));
-
-  int num_to_return = std::min(num_return_objects, num_returns);
-  jclass clsByteArray = env->FindClass("[B");
-  jobjectArray ret = env->NewObjectArray(num_to_return, clsByteArray, nullptr);
-
-  int num_returned = 0;
-  jbyteArray oid = nullptr;
-  for (int i = 0; i < num_oids; ++i) {
-    if (num_returned >= num_to_return) {
-      break;
-    }
-
-    if (oreqs[i].location == plasma::ObjectLocation::Local ||
-        oreqs[i].location == plasma::ObjectLocation::Remote) {
-      oid = env->NewByteArray(OBJECT_ID_SIZE);
-      object_id_to_jbyteArray(env, oid, &oreqs[i].object_id);
-      env->SetObjectArrayElement(ret, num_returned, oid);
-      num_returned++;
-    }
-  }
-  ARROW_CHECK(num_returned == num_to_return);
-
-  return ret;
-}
-
 JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_evict(
     JNIEnv* env, jclass cls, jlong conn, jlong num_bytes) {
   plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 83caec7..aafe527 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -68,9 +68,6 @@ constexpr int64_t kBlockSize = 64;
 
 struct Client;
 
-/// Mapping from object IDs to type and status of the request.
-typedef std::unordered_map<ObjectID, ObjectRequest> ObjectRequestMap;
-
 // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
 struct PlasmaObject {
 #ifdef PLASMA_CUDA
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index c437840..a878647 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -42,10 +42,6 @@ using flatbuffers::uoffset_t;
 #define PLASMA_CHECK_ENUM(x, y) \
   static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch")
 
-PLASMA_CHECK_ENUM(ObjectLocation::Local, fb::ObjectStatus::Local);
-PLASMA_CHECK_ENUM(ObjectLocation::Remote, fb::ObjectStatus::Remote);
-PLASMA_CHECK_ENUM(ObjectLocation::Nonexistent, fb::ObjectStatus::Nonexistent);
-
 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
 ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
              int64_t num_objects) {
@@ -367,56 +363,6 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object
   return Status::OK();
 }
 
-// Satus messages.
-
-Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message =
-      fb::CreatePlasmaStatusRequest(fbb, ToFlatbuffer(&fbb, object_ids, num_objects));
-  return PlasmaSend(sock, MessageType::PlasmaStatusRequest, &fbb, message);
-}
-
-Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
-                         int64_t num_objects) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<fb::PlasmaStatusRequest>(data);
-  DCHECK(VerifyFlatbuffer(message, data, size));
-  for (uoffset_t i = 0; i < num_objects; ++i) {
-    object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
-  }
-  return Status::OK();
-}
-
-Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
-                       int64_t num_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message =
-      fb::CreatePlasmaStatusReply(fbb, ToFlatbuffer(&fbb, object_ids, num_objects),
-                                  fbb.CreateVector(object_status, num_objects));
-  return PlasmaSend(sock, MessageType::PlasmaStatusReply, &fbb, message);
-}
-
-int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<fb::PlasmaStatusReply>(data);
-  DCHECK(VerifyFlatbuffer(message, data, size));
-  return message->object_ids()->size();
-}
-
-Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
-                       int object_status[], int64_t num_objects) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<fb::PlasmaStatusReply>(data);
-  DCHECK(VerifyFlatbuffer(message, data, size));
-  for (uoffset_t i = 0; i < num_objects; ++i) {
-    object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
-  }
-  for (uoffset_t i = 0; i < num_objects; ++i) {
-    object_status[i] = message->status()->data()[i];
-  }
-  return Status::OK();
-}
-
 // Contains messages.
 
 Status SendContainsRequest(int sock, ObjectID object_id) {
@@ -640,95 +586,6 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
   }
   return Status::OK();
 }
-// Fetch messages.
-
-Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message =
-      fb::CreatePlasmaFetchRequest(fbb, ToFlatbuffer(&fbb, object_ids, num_objects));
-  return PlasmaSend(sock, MessageType::PlasmaFetchRequest, &fbb, message);
-}
-
-Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<fb::PlasmaFetchRequest>(data);
-  DCHECK(VerifyFlatbuffer(message, data, size));
-  for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
-    object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
-  }
-  return Status::OK();
-}
-
-// Wait messages.
-
-Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
-                       int num_ready_objects, int64_t timeout_ms) {
-  flatbuffers::FlatBufferBuilder fbb;
-
-  std::vector<flatbuffers::Offset<fb::ObjectRequestSpec>> object_request_specs;
-  for (int i = 0; i < num_requests; i++) {
-    object_request_specs.push_back(fb::CreateObjectRequestSpec(
-        fbb, fbb.CreateString(object_requests[i].object_id.binary()),
-        static_cast<int>(object_requests[i].type)));
-  }
-
-  auto message = fb::CreatePlasmaWaitRequest(fbb, fbb.CreateVector(object_request_specs),
-                                             num_ready_objects, timeout_ms);
-  return PlasmaSend(sock, MessageType::PlasmaWaitRequest, &fbb, message);
-}
-
-Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
-                       int64_t* timeout_ms, int* num_ready_objects) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<fb::PlasmaWaitRequest>(data);
-  DCHECK(VerifyFlatbuffer(message, data, size));
-  *num_ready_objects = message->num_ready_objects();
-  *timeout_ms = message->timeout();
-
-  for (uoffset_t i = 0; i < message->object_requests()->size(); i++) {
-    ObjectID object_id =
-        ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
-    ObjectRequest object_request(
-        {object_id,
-         static_cast<ObjectRequestType>(message->object_requests()->Get(i)->type()),
-         ObjectLocation::Nonexistent});
-    object_requests[object_id] = object_request;
-  }
-  return Status::OK();
-}
-
-Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
-                     int num_ready_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-
-  std::vector<flatbuffers::Offset<fb::ObjectReply>> object_replies;
-  for (const auto& entry : object_requests) {
-    const auto& object_request = entry.second;
-    object_replies.push_back(
-        fb::CreateObjectReply(fbb, fbb.CreateString(object_request.object_id.binary()),
-                              static_cast<fb::ObjectStatus>(object_request.location)));
-  }
-
-  auto message = fb::CreatePlasmaWaitReply(
-      fbb, fbb.CreateVector(object_replies.data(), num_ready_objects), num_ready_objects);
-  return PlasmaSend(sock, MessageType::PlasmaWaitReply, &fbb, message);
-}
-
-Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
-                     int* num_ready_objects) {
-  DCHECK(data);
-
-  auto message = flatbuffers::GetRoot<fb::PlasmaWaitReply>(data);
-  DCHECK(VerifyFlatbuffer(message, data, size));
-  *num_ready_objects = message->num_ready_objects();
-  for (int i = 0; i < *num_ready_objects; i++) {
-    object_requests[i].object_id =
-        ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
-    object_requests[i].location =
-        static_cast<ObjectLocation>(message->object_requests()->Get(i)->status());
-  }
-  return Status::OK();
-}
 
 // Subscribe messages.
 
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index c820458..0362bd4 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -128,21 +128,6 @@ Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
 Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
                        std::vector<PlasmaError>* errors);
 
-/* Satus messages. */
-
-Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
-
-Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
-                         int64_t num_objects);
-
-Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
-                       int64_t num_objects);
-
-int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size);
-
-Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
-                       int object_status[], int64_t num_objects);
-
 /* Plasma Constains message functions. */
 
 Status SendContainsRequest(int sock, ObjectID object_id);
@@ -184,26 +169,6 @@ Status SendEvictReply(int sock, int64_t num_bytes);
 
 Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes);
 
-/* Plasma Fetch Remote message functions. */
-
-Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
-
-Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids);
-
-/* Plasma Wait message functions. */
-
-Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
-                       int num_ready_objects, int64_t timeout_ms);
-
-Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
-                       int64_t* timeout_ms, int* num_ready_objects);
-
-Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
-                     int num_ready_objects);
-
-Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
-                     int* num_ready_objects);
-
 /* Plasma Subscribe message functions. */
 
 Status SendSubscribeRequest(int sock);
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 65a9b71..30dc685 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -187,7 +187,6 @@ TEST_F(TestPlasmaStore, DeleteTest) {
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
   ASSERT_TRUE(has_object);
 
-  // Avoid race condition of Plasma Manager waiting for notification.
   ARROW_CHECK_OK(client_.Release(object_id));
   // object_id is marked as to-be-deleted, when it is not in use, it will be deleted.
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
@@ -251,7 +250,6 @@ TEST_F(TestPlasmaStore, ContainsTest) {
   // First create object.
   std::vector<uint8_t> data(100, 0);
   CreateObject(client_, object_id, {42}, data);
-  // Avoid race condition of Plasma Manager waiting for notification.
   std::vector<ObjectBuffer> object_buffers;
   ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 085ae97..66d651d 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -254,44 +254,6 @@ TEST(PlasmaSerialization, DeleteReply) {
   close(fd);
 }
 
-TEST(PlasmaSerialization, StatusRequest) {
-  int fd = create_temp_file();
-  constexpr int64_t num_objects = 2;
-  ObjectID object_ids[num_objects];
-  object_ids[0] = random_object_id();
-  object_ids[1] = random_object_id();
-  ARROW_CHECK_OK(SendStatusRequest(fd, object_ids, num_objects));
-  std::vector<uint8_t> data =
-      read_message_from_file(fd, MessageType::PlasmaStatusRequest);
-  ObjectID object_ids_read[num_objects];
-  ARROW_CHECK_OK(
-      ReadStatusRequest(data.data(), data.size(), object_ids_read, num_objects));
-  ASSERT_EQ(object_ids[0], object_ids_read[0]);
-  ASSERT_EQ(object_ids[1], object_ids_read[1]);
-  close(fd);
-}
-
-TEST(PlasmaSerialization, StatusReply) {
-  int fd = create_temp_file();
-  ObjectID object_ids[2];
-  object_ids[0] = random_object_id();
-  object_ids[1] = random_object_id();
-  int object_statuses[2] = {42, 43};
-  ARROW_CHECK_OK(SendStatusReply(fd, object_ids, object_statuses, 2));
-  std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaStatusReply);
-  int64_t num_objects = ReadStatusReply_num_objects(data.data(), data.size());
-
-  std::vector<ObjectID> object_ids_read(num_objects);
-  std::vector<int> object_statuses_read(num_objects);
-  ARROW_CHECK_OK(ReadStatusReply(data.data(), data.size(), object_ids_read.data(),
-                                 object_statuses_read.data(), num_objects));
-  ASSERT_EQ(object_ids[0], object_ids_read[0]);
-  ASSERT_EQ(object_ids[1], object_ids_read[1]);
-  ASSERT_EQ(object_statuses[0], object_statuses_read[0]);
-  ASSERT_EQ(object_statuses[1], object_statuses_read[1]);
-  close(fd);
-}
-
 TEST(PlasmaSerialization, EvictRequest) {
   int fd = create_temp_file();
   int64_t num_bytes = 111;
@@ -314,84 +276,6 @@ TEST(PlasmaSerialization, EvictReply) {
   close(fd);
 }
 
-TEST(PlasmaSerialization, FetchRequest) {
-  int fd = create_temp_file();
-  ObjectID object_ids[2];
-  object_ids[0] = random_object_id();
-  object_ids[1] = random_object_id();
-  ARROW_CHECK_OK(SendFetchRequest(fd, object_ids, 2));
-  std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaFetchRequest);
-  std::vector<ObjectID> object_ids_read;
-  ARROW_CHECK_OK(ReadFetchRequest(data.data(), data.size(), object_ids_read));
-  ASSERT_EQ(object_ids[0], object_ids_read[0]);
-  ASSERT_EQ(object_ids[1], object_ids_read[1]);
-  close(fd);
-}
-
-TEST(PlasmaSerialization, WaitRequest) {
-  int fd = create_temp_file();
-  const int num_objects_in = 2;
-  ObjectRequest object_requests_in[num_objects_in] = {
-      ObjectRequest({random_object_id(), ObjectRequestType::PLASMA_QUERY_ANYWHERE,
-                     ObjectLocation::Local}),
-      ObjectRequest({random_object_id(), ObjectRequestType::PLASMA_QUERY_LOCAL,
-                     ObjectLocation::Local})};
-  const int num_ready_objects_in = 1;
-  int64_t timeout_ms = 1000;
-
-  ARROW_CHECK_OK(SendWaitRequest(fd, &object_requests_in[0], num_objects_in,
-                                 num_ready_objects_in, timeout_ms));
-  /* Read message back. */
-  std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaWaitRequest);
-  int num_ready_objects_out;
-  int64_t timeout_ms_read;
-  ObjectRequestMap object_requests_out;
-  ARROW_CHECK_OK(ReadWaitRequest(data.data(), data.size(), object_requests_out,
-                                 &timeout_ms_read, &num_ready_objects_out));
-  ASSERT_EQ(num_objects_in, object_requests_out.size());
-  ASSERT_EQ(num_ready_objects_out, num_ready_objects_in);
-  for (int i = 0; i < num_objects_in; i++) {
-    const ObjectID& object_id = object_requests_in[i].object_id;
-    ASSERT_EQ(1, object_requests_out.count(object_id));
-    const auto& entry = object_requests_out.find(object_id);
-    ASSERT_TRUE(entry != object_requests_out.end());
-    ASSERT_EQ(entry->second.object_id, object_requests_in[i].object_id);
-    ASSERT_EQ(entry->second.type, object_requests_in[i].type);
-  }
-  close(fd);
-}
-
-TEST(PlasmaSerialization, WaitReply) {
-  int fd = create_temp_file();
-  const int num_objects_in = 2;
-  /* Create a map with two ObjectRequests in it. */
-  ObjectRequestMap objects_in(num_objects_in);
-  ObjectID id1 = random_object_id();
-  objects_in[id1] =
-      ObjectRequest({id1, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectLocation::Local});
-  ObjectID id2 = random_object_id();
-  objects_in[id2] = ObjectRequest(
-      {id2, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectLocation::Nonexistent});
-
-  ARROW_CHECK_OK(SendWaitReply(fd, objects_in, num_objects_in));
-  /* Read message back. */
-  std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaWaitReply);
-  ObjectRequest objects_out[2];
-  int num_objects_out;
-  ARROW_CHECK_OK(
-      ReadWaitReply(data.data(), data.size(), &objects_out[0], &num_objects_out));
-  ASSERT_EQ(num_objects_in, num_objects_out);
-  for (int i = 0; i < num_objects_out; i++) {
-    /* Each object request must appear exactly once. */
-    ASSERT_EQ(objects_in.count(objects_out[i].object_id), 1);
-    const auto& entry = objects_in.find(objects_out[i].object_id);
-    ASSERT_TRUE(entry != objects_in.end());
-    ASSERT_EQ(entry->second.object_id, objects_out[i].object_id);
-    ASSERT_EQ(entry->second.location, objects_out[i].location);
-  }
-  close(fd);
-}
-
 TEST(PlasmaSerialization, DataRequest) {
   int fd = create_temp_file();
   ObjectID object_id1 = random_object_id();
diff --git a/docs/source/python/plasma.rst b/docs/source/python/plasma.rst
index 3df68ef..660c5fb 100644
--- a/docs/source/python/plasma.rst
+++ b/docs/source/python/plasma.rst
@@ -60,7 +60,7 @@ socket name:
 .. code-block:: python
 
   import pyarrow.plasma as plasma
-  client = plasma.connect("/tmp/plasma", "")
+  client = plasma.connect("/tmp/plasma")
 
 If the following error occurs from running the above Python code, that
 means that either the socket given is incorrect, or the ``./plasma_store`` is
@@ -68,7 +68,7 @@ not currently running. Check to see if the Plasma store is still running.
 
 .. code-block:: shell
 
-  >>> client = plasma.connect("/tmp/plasma", "")
+  >>> client = plasma.connect("/tmp/plasma")
   Connection to socket failed for pathname /tmp/plasma
   Could not connect to socket /tmp/plasma
 
@@ -179,7 +179,7 @@ the object buffer.
 
   # Create a different client. Note that this second client could be
   # created in the same or in a separate, concurrent Python session.
-  client2 = plasma.connect("/tmp/plasma", "")
+  client2 = plasma.connect("/tmp/plasma")
 
   # Get the object in the second client. This blocks until the object has been sealed.
   object_id2 = plasma.ObjectID(20 * b"a")
@@ -221,7 +221,7 @@ of the object info might change in the future):
   import pyarrow.plasma as plasma
   import time
 
-  client = plasma.connect("/tmp/plasma", "")
+  client = plasma.connect("/tmp/plasma")
 
   client.put("hello, world")
   # Sleep a little so we get different creation times
@@ -452,7 +452,7 @@ You can test this with the following script:
   import pyarrow.plasma as plasma
   import time
 
-  client = plasma.connect("/tmp/plasma", "")
+  client = plasma.connect("/tmp/plasma")
 
   data = np.random.randn(100000000)
   tensor = pa.Tensor.from_numpy(data)
diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
index 3b67bc0..8d6eec0 100644
--- a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
+++ b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
@@ -80,16 +80,6 @@ public interface ObjectStoreLink {
   List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs);
 
   /**
-   * Wait until <tt>numReturns</tt> objects in <tt>objectIds</tt> are ready.
-   *
-   * @param objectIds List of object IDs to wait for.
-   * @param timeoutMs Return to the caller after <tt>timeoutMs</tt> milliseconds.
-   * @param numReturns We are waiting for this number of objects to be ready.
-   * @return List of object IDs that are ready
-   */
-  List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns);
-
-  /**
    * Compute the hash of an object in the object store.
    *
    * @param objectId The object ID used to identify the object.
@@ -99,23 +89,6 @@ public interface ObjectStoreLink {
   byte[] hash(byte[] objectId);
 
   /**
-   * Fetch the object with the given ID from other plasma manager instances.
-   *
-   * @param objectId The object ID used to identify the object.
-   */
-  default void fetch(byte[] objectId) {
-    byte[][] objectIds = {objectId};
-    fetch(objectIds);
-  }
-
-  /**
-   * Fetch the objects with the given IDs from other plasma manager instances.
-   *
-   * @param objectIds List of object IDs used to identify the objects.
-   */
-  void fetch(byte[][] objectIds);
-
-  /**
    * Evict some objects to recover given count of bytes.
    *
    * @param numBytes The number of bytes to attempt to recover.
diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
index db1f35e..d69b54d 100644
--- a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
+++ b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
@@ -82,34 +82,11 @@ public class PlasmaClient implements ObjectStoreLink {
   }
 
   @Override
-  public List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns) {
-    byte[][] readys = PlasmaClientJNI.wait(conn, objectIds, timeoutMs, numReturns);
-
-    List<byte[]> ret = new ArrayList<>();
-    for (byte[] ready : readys) {
-      for (byte[] id : objectIds) {
-        if (Arrays.equals(ready, id)) {
-          ret.add(id);
-          break;
-        }
-      }
-    }
-
-    assert (ret.size() == readys.length);
-    return ret;
-  }
-
-  @Override
   public byte[] hash(byte[] objectId) {
     return PlasmaClientJNI.hash(conn, objectId);
   }
 
   @Override
-  public void fetch(byte[][] objectIds) {
-    PlasmaClientJNI.fetch(conn, objectIds);
-  }
-
-  @Override
   public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
     ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs);
     assert bufs.length == objectIds.length;
diff --git a/python/benchmarks/plasma.py b/python/benchmarks/plasma.py
index 7cefcdf..398ec72 100644
--- a/python/benchmarks/plasma.py
+++ b/python/benchmarks/plasma.py
@@ -32,7 +32,7 @@ class SimplePlasmaThroughput(object):
         self.plasma_store_ctx = plasma.start_plasma_store(
             plasma_store_memory=10**9)
         plasma_store_name, p = self.plasma_store_ctx.__enter__()
-        self.plasma_client = plasma.connect(plasma_store_name, "", 64)
+        self.plasma_client = plasma.connect(plasma_store_name)
 
         self.data = np.random.randn(size // 8)
 
@@ -52,7 +52,7 @@ class SimplePlasmaLatency(object):
         self.plasma_store_ctx = plasma.start_plasma_store(
             plasma_store_memory=10**9)
         plasma_store_name, p = self.plasma_store_ctx.__enter__()
-        self.plasma_client = plasma.connect(plasma_store_name, "", 64)
+        self.plasma_client = plasma.connect(plasma_store_name)
 
     def teardown(self):
         self.plasma_store_ctx.__exit__(None, None, None)
diff --git a/python/examples/plasma/sorting/sort_df.py b/python/examples/plasma/sorting/sort_df.py
index 2e4df58..2a51759 100644
--- a/python/examples/plasma/sorting/sort_df.py
+++ b/python/examples/plasma/sorting/sort_df.py
@@ -49,7 +49,7 @@ column_to_sort = column_names[0]
 # Connect to clients
 def connect():
     global client
-    client = plasma.connect('/tmp/store', '', 0)
+    client = plasma.connect('/tmp/store')
     np.random.seed(int(time.time() * 10e7) % 10000000)
 
 
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index f7db3b4..cfaa39c 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -63,11 +63,6 @@ cdef extern from "plasma/common.h" nogil:
         @staticmethod
         int64_t size()
 
-    cdef struct CObjectRequest" plasma::ObjectRequest":
-        CUniqueID object_id
-        int type
-        int location
-
     cdef enum CObjectState" plasma::ObjectState":
         PLASMA_CREATED" plasma::ObjectState::PLASMA_CREATED"
         PLASMA_SEALED" plasma::ObjectState::PLASMA_SEALED"
@@ -92,14 +87,6 @@ cdef extern from "plasma/common.h" nogil:
 cdef extern from "plasma/common.h":
     cdef int64_t kDigestSize" plasma::kDigestSize"
 
-    cdef enum ObjectRequestType:
-        PLASMA_QUERY_LOCAL"plasma::ObjectRequestType::PLASMA_QUERY_LOCAL",
-        PLASMA_QUERY_ANYWHERE"plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE"
-
-    cdef enum ObjectLocation:
-        ObjectStatusLocal"plasma::ObjectLocation::Local"
-        ObjectStatusRemote"plasma::ObjectLocation::Remote"
-
 cdef extern from "plasma/client.h" nogil:
 
     cdef cppclass CPlasmaClient" plasma::PlasmaClient":
@@ -143,16 +130,6 @@ cdef extern from "plasma/client.h" nogil:
 
         CStatus Disconnect()
 
-        CStatus Fetch(int num_object_ids, const CUniqueID* object_ids)
-
-        CStatus Wait(int64_t num_object_requests,
-                     CObjectRequest* object_requests,
-                     int num_ready_objects, int64_t timeout_ms,
-                     int* num_objects_ready)
-
-        CStatus Transfer(const char* addr, int port,
-                         const CUniqueID& object_id)
-
         CStatus Delete(const c_vector[CUniqueID] object_ids)
 
 cdef extern from "plasma/client.h" nogil:
@@ -285,13 +262,11 @@ cdef class PlasmaClient:
         shared_ptr[CPlasmaClient] client
         int notification_fd
         c_string store_socket_name
-        c_string manager_socket_name
 
     def __cinit__(self):
         self.client.reset(new CPlasmaClient())
         self.notification_fd = -1
         self.store_socket_name = b""
-        self.manager_socket_name = b""
 
     cdef _get_object_buffers(self, object_ids, int64_t timeout_ms,
                              c_vector[CObjectBuffer]* result):
@@ -315,10 +290,6 @@ cdef class PlasmaClient:
     def store_socket_name(self):
         return self.store_socket_name.decode()
 
-    @property
-    def manager_socket_name(self):
-        return self.manager_socket_name.decode()
-
     def create(self, ObjectID object_id, int64_t data_size,
                c_string metadata=b""):
         """
@@ -642,95 +613,6 @@ cdef class PlasmaClient:
             check_status(self.client.get().Evict(num_bytes, num_bytes_evicted))
         return num_bytes_evicted
 
-    def transfer(self, address, int port, ObjectID object_id):
-        """
-        Transfer local object with id object_id to another plasma instance
-
-        Parameters
-        ----------
-        addr : str
-            IPv4 address of the plasma instance the object is sent to.
-        port : int
-            Port number of the plasma instance the object is sent to.
-        object_id : str
-            A string used to identify an object.
-        """
-        cdef c_string addr = address.encode()
-        with nogil:
-            check_status(self.client.get()
-                         .Transfer(addr.c_str(), port, object_id.data))
-
-    def fetch(self, object_ids):
-        """
-        Fetch the objects with the given IDs from other plasma managers.
-
-        Parameters
-        ----------
-        object_ids : list
-            A list of strings used to identify the objects.
-        """
-        cdef c_vector[CUniqueID] ids
-        cdef ObjectID object_id
-        for object_id in object_ids:
-            ids.push_back(object_id.data)
-        with nogil:
-            check_status(self.client.get().Fetch(ids.size(), ids.data()))
-
-    def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT,
-             int num_returns=1):
-        """
-        Wait until num_returns objects in object_ids are ready.
-        Currently, the object ID arguments to wait must be unique.
-
-        Parameters
-        ----------
-        object_ids : list
-            List of object IDs to wait for.
-        timeout :int
-            Return to the caller after timeout milliseconds.
-        num_returns : int
-            We are waiting for this number of objects to be ready.
-
-        Returns
-        -------
-        list
-            List of object IDs that are ready.
-        list
-            List of object IDs we might still wait on.
-        """
-        # Check that the object ID arguments are unique. The plasma manager
-        # currently crashes if given duplicate object IDs.
-        if len(object_ids) != len(set(object_ids)):
-            raise Exception("Wait requires a list of unique object IDs.")
-        cdef int64_t num_object_requests = len(object_ids)
-        cdef c_vector[CObjectRequest] object_requests = (
-            c_vector[CObjectRequest](num_object_requests))
-        cdef int num_objects_ready = 0
-        cdef ObjectID object_id
-        for i, object_id in enumerate(object_ids):
-            object_requests[i].object_id = object_id.data
-            object_requests[i].type = PLASMA_QUERY_ANYWHERE
-        with nogil:
-            check_status(self.client.get().Wait(num_object_requests,
-                                                object_requests.data(),
-                                                num_returns, timeout,
-                                                &num_objects_ready))
-        cdef int num_to_return = min(num_objects_ready, num_returns)
-        ready_ids = []
-        waiting_ids = set(object_ids)
-        cdef int num_returned = 0
-        for i in range(len(object_ids)):
-            if num_returned == num_to_return:
-                break
-            if (object_requests[i].location == ObjectStatusLocal or
-                    object_requests[i].location == ObjectStatusRemote):
-                ready_ids.append(
-                    ObjectID(object_requests[i].object_id.binary()))
-                waiting_ids.discard(
-                    ObjectID(object_requests[i].object_id.binary()))
-                num_returned += 1
-        return ready_ids, list(waiting_ids)
-
     def subscribe(self):
         """Subscribe to notifications about sealed objects."""
         with nogil:
@@ -873,7 +755,7 @@ cdef class PlasmaClient:
         return result
 
 
-def connect(store_socket_name, manager_socket_name, int release_delay=0,
+def connect(store_socket_name, manager_socket_name=None, int release_delay=0,
             int num_retries=-1):
     """
     Return a new PlasmaClient that is connected a plasma store and
@@ -884,22 +766,24 @@ def connect(store_socket_name, manager_socket_name, int release_delay=0,
     store_socket_name : str
         Name of the socket the plasma store is listening at.
     manager_socket_name : str
-        Name of the socket the plasma manager is listening at.
+        This parameter is deprecated and has no effect.
     release_delay : int
         This parameter is deprecated and has no effect.
     num_retries : int, default -1
         Number of times to try to connect to plasma store. Default value of -1
         uses the default (50)
     """
+    if manager_socket_name is not None:
+        warnings.warn(
+            "manager_socket_name in PlasmaClient.connect is deprecated",
+            FutureWarning)
     cdef PlasmaClient result = PlasmaClient()
     result.store_socket_name = store_socket_name.encode()
-    result.manager_socket_name = manager_socket_name.encode()
     if release_delay != 0:
         warnings.warn("release_delay in PlasmaClient.connect is deprecated",
                       FutureWarning)
     with nogil:
         check_status(result.client.get()
-                     .Connect(result.store_socket_name,
-                              result.manager_socket_name,
+                     .Connect(result.store_socket_name, b"",
                               release_delay, num_retries))
     return result
diff --git a/python/pyarrow/tensorflow/plasma_op.cc b/python/pyarrow/tensorflow/plasma_op.cc
index 4e6449a..852be33 100644
--- a/python/pyarrow/tensorflow/plasma_op.cc
+++ b/python/pyarrow/tensorflow/plasma_op.cc
@@ -71,13 +71,10 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
   explicit TensorToPlasmaOp(tf::OpKernelConstruction* context) : tf::AsyncOpKernel(context) {
     OP_REQUIRES_OK(context, context->GetAttr("plasma_store_socket_name",
                                              &plasma_store_socket_name_));
-    OP_REQUIRES_OK(context, context->GetAttr("plasma_manager_socket_name",
-                                             &plasma_manager_socket_name_));
     tf::mutex_lock lock(mu_);
     if (!connected_) {
       VLOG(1) << "Connecting to Plasma...";
-      ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
-                                     plasma_manager_socket_name_));
+      ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_));
       VLOG(1) << "Connected!";
       connected_ = true;
     }
@@ -226,7 +223,6 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
 
  private:
   std::string plasma_store_socket_name_;
-  std::string plasma_manager_socket_name_;
 
   tf::mutex mu_;
   bool connected_ = false;
@@ -243,13 +239,10 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
   explicit PlasmaToTensorOp(tf::OpKernelConstruction* context) : tf::AsyncOpKernel(context) {
     OP_REQUIRES_OK(context, context->GetAttr("plasma_store_socket_name",
                                              &plasma_store_socket_name_));
-    OP_REQUIRES_OK(context, context->GetAttr("plasma_manager_socket_name",
-                                             &plasma_manager_socket_name_));
     tf::mutex_lock lock(mu_);
     if (!connected_) {
       VLOG(1) << "Connecting to Plasma...";
-      ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
-                                     plasma_manager_socket_name_));
+      ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_));
       VLOG(1) << "Connected!";
       connected_ = true;
     }
@@ -364,7 +357,6 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
 
  private:
   std::string plasma_store_socket_name_;
-  std::string plasma_manager_socket_name_;
 
   tf::mutex mu_;
   bool connected_ = false;
@@ -375,8 +367,7 @@ REGISTER_OP("TensorToPlasma")
     .Input("input_tensor: dtypes")
     .Input("plasma_object_id: string")
     .Attr("dtypes: list(type)")
-    .Attr("plasma_store_socket_name: string")
-    .Attr("plasma_manager_socket_name: string");
+    .Attr("plasma_store_socket_name: string");
 
 REGISTER_KERNEL_BUILDER(Name("TensorToPlasma").Device(tf::DEVICE_CPU),
                         TensorToPlasmaOp<CPUDevice>);
@@ -389,8 +380,7 @@ REGISTER_OP("PlasmaToTensor")
     .Input("plasma_object_id: string")
     .Output("tensor: dtype")
     .Attr("dtype: type")
-    .Attr("plasma_store_socket_name: string")
-    .Attr("plasma_manager_socket_name: string");
+    .Attr("plasma_store_socket_name: string");
 
 REGISTER_KERNEL_BUILDER(Name("PlasmaToTensor").Device(tf::DEVICE_CPU),
                         PlasmaToTensorOp<CPUDevice>);
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 66449e6..05375d7 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -121,8 +121,8 @@ class TestPlasmaClient(object):
             use_one_memory_mapped_file=use_one_memory_mapped_file)
         self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
         # Connect to Plasma.
-        self.plasma_client = plasma.connect(self.plasma_store_name, "")
-        self.plasma_client2 = plasma.connect(self.plasma_store_name, "")
+        self.plasma_client = plasma.connect(self.plasma_store_name)
+        self.plasma_client2 = plasma.connect(self.plasma_store_name)
 
     def teardown_method(self, test_method):
         try:
@@ -147,7 +147,7 @@ class TestPlasmaClient(object):
         import pyarrow.plasma as plasma
         # ARROW-1264
         with pytest.raises(IOError):
-            plasma.connect('unknown-store-name', '', 0, 1)
+            plasma.connect('unknown-store-name', num_retries=1)
 
     def test_create(self):
         # Create an object id string.
@@ -860,7 +860,7 @@ class TestPlasmaClient(object):
         object_id = random_object_id()
 
         def client_blocked_in_get(plasma_store_name):
-            client = plasma.connect(self.plasma_store_name, "", 0)
+            client = plasma.connect(self.plasma_store_name)
             # Try to get an object ID that doesn't exist. This should block.
             client.get([object_id])
 
@@ -889,7 +889,7 @@ class TestPlasmaClient(object):
         object_ids = [random_object_id() for _ in range(10)]
 
         def client_get_multiple(plasma_store_name):
-            client = plasma.connect(self.plasma_store_name, "", 0)
+            client = plasma.connect(self.plasma_store_name)
             # Try to get an object ID that doesn't exist. This should block.
             client.get(object_ids)
 
@@ -948,7 +948,7 @@ def test_use_huge_pages():
             plasma_store_memory=2*10**9,
             plasma_directory="/mnt/hugepages",
             use_hugepages=True) as (plasma_store_name, p):
-        plasma_client = plasma.connect(plasma_store_name, "")
+        plasma_client = plasma.connect(plasma_store_name)
         create_object(plasma_client, 10**8)
 
 
@@ -962,7 +962,7 @@ def test_plasma_client_sharing():
     with plasma.start_plasma_store(
             plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
             as (plasma_store_name, p):
-        plasma_client = plasma.connect(plasma_store_name, "")
+        plasma_client = plasma.connect(plasma_store_name)
         object_id = plasma_client.put(np.zeros(3))
         buf = plasma_client.get(object_id)
         del plasma_client
@@ -977,7 +977,7 @@ def test_plasma_list():
     with plasma.start_plasma_store(
             plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
             as (plasma_store_name, p):
-        plasma_client = plasma.connect(plasma_store_name, "", 0)
+        plasma_client = plasma.connect(plasma_store_name)
 
         # Test sizes
         u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False)
diff --git a/python/pyarrow/tests/test_plasma_tf_op.py b/python/pyarrow/tests/test_plasma_tf_op.py
index 51e8b28..e239055 100644
--- a/python/pyarrow/tests/test_plasma_tf_op.py
+++ b/python/pyarrow/tests/test_plasma_tf_op.py
@@ -37,15 +37,13 @@ def run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
         return plasma.tf_plasma_op.tensor_to_plasma(
             [data_tensor, ones_tensor],
             object_id,
-            plasma_store_socket_name=plasma_store_name,
-            plasma_manager_socket_name="")
+            plasma_store_socket_name=plasma_store_name)
 
     def FromPlasma():
         return plasma.tf_plasma_op.plasma_to_tensor(
             object_id,
             dtype=tf.as_dtype(dtype),
-            plasma_store_socket_name=plasma_store_name,
-            plasma_manager_socket_name="")
+            plasma_store_socket_name=plasma_store_name)
 
     with tf.device(FORCE_DEVICE):
         to_plasma = ToPlasma()
@@ -94,7 +92,7 @@ def test_plasma_tf_op(use_gpu=False):
         pytest.skip("TensorFlow Op not found")
 
     with plasma.start_plasma_store(10**8) as (plasma_store_name, p):
-        client = plasma.connect(plasma_store_name, "")
+        client = plasma.connect(plasma_store_name)
         for dtype in [np.float32, np.float64,
                       np.int8, np.int16, np.int32, np.int64]:
             run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,