You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/12/14 15:27:19 UTC
[arrow] branch master updated: ARROW-4015: [Plasma] remove unused
interfaces for plasma manager
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 8c41303 ARROW-4015: [Plasma] remove unused interfaces for plasma manager
8c41303 is described below
commit 8c413036775796d9bcc52be56373bbb45de8c0ae
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Fri Dec 14 07:27:08 2018 -0800
ARROW-4015: [Plasma] remove unused interfaces for plasma manager
https://github.com/apache/arrow/issues/3154
This removes unused plasma interfaces Fetch(), Wait(), Transfer() and Info(), which depend on plasma manager which has already been removed from ray.
Author: Philipp Moritz <pc...@gmail.com>
Author: Zhijun Fu <pi...@antfin.com>
Author: Robert Nishihara <ro...@gmail.com>
Closes #3167 from zhijunfu/remove-legacy-interfaces and squashes the following commits:
0efb5005f <Philipp Moritz> fix tensorflow op
be92e9085 <Philipp Moritz> fix java client
9da2cd38b <Philipp Moritz> Update _plasma.pyx
16ec63e9a <Robert Nishihara> More updates
e7413f739 <Robert Nishihara> Update _plasma.pyx
21398b5e7 <Zhijun Fu> merge
bcb320400 <Zhijun Fu> address comments
7967aea09 <Philipp Moritz> Merge branch 'master' into remove-legacy-interfaces
583cd97c4 <Zhijun Fu> ARROW-4015: remove unused interfaces for plasma manager
---
c_glib/plasma-glib/client.cpp | 3 +-
cpp/apidoc/tutorials/plasma.md | 8 +-
cpp/apidoc/tutorials/tensor_to_py.md | 2 +-
cpp/src/plasma/client.cc | 111 +---------------
cpp/src/plasma/client.h | 100 +-------------
cpp/src/plasma/common.cc | 3 -
cpp/src/plasma/common.h | 24 ----
cpp/src/plasma/format/plasma.fbs | 74 -----------
.../org_apache_arrow_plasma_PlasmaClientJNI.cc | 73 -----------
cpp/src/plasma/plasma.h | 3 -
cpp/src/plasma/protocol.cc | 143 ---------------------
cpp/src/plasma/protocol.h | 35 -----
cpp/src/plasma/test/client_tests.cc | 2 -
cpp/src/plasma/test/serialization_tests.cc | 116 -----------------
docs/source/python/plasma.rst | 10 +-
.../org/apache/arrow/plasma/ObjectStoreLink.java | 27 ----
.../java/org/apache/arrow/plasma/PlasmaClient.java | 23 ----
python/benchmarks/plasma.py | 4 +-
python/examples/plasma/sorting/sort_df.py | 2 +-
python/pyarrow/_plasma.pyx | 130 +------------------
python/pyarrow/tensorflow/plasma_op.cc | 18 +--
python/pyarrow/tests/test_plasma.py | 16 +--
python/pyarrow/tests/test_plasma_tf_op.py | 8 +-
23 files changed, 41 insertions(+), 894 deletions(-)
diff --git a/c_glib/plasma-glib/client.cpp b/c_glib/plasma-glib/client.cpp
index c05a710..9591a0a 100644
--- a/c_glib/plasma-glib/client.cpp
+++ b/c_glib/plasma-glib/client.cpp
@@ -41,8 +41,7 @@ G_BEGIN_DECLS
*
* #GPlasmaClientCreateOptions is a class for customizing object creation.
*
- * #GPlasmaClient is a class for an interface with a plasma store
- * and a plasma manager.
+ * #GPlasmaClient is a class for an interface with a plasma store.
*
* Since: 0.12.0
*/
diff --git a/cpp/apidoc/tutorials/plasma.md b/cpp/apidoc/tutorials/plasma.md
index 472d479..b9046d5 100644
--- a/cpp/apidoc/tutorials/plasma.md
+++ b/cpp/apidoc/tutorials/plasma.md
@@ -80,7 +80,7 @@ using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
- ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
+ ARROW_CHECK_OK(client.Connect("/tmp/plasma"));
// Disconnect the Plasma client.
ARROW_CHECK_OK(client.Disconnect());
}
@@ -226,7 +226,7 @@ using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
- ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
+ ARROW_CHECK_OK(client.Connect("/tmp/plasma"));
// Create an object with a fixed ObjectID.
ObjectID object_id = ObjectID::from_binary("00000000000000000000");
int64_t data_size = 1000;
@@ -332,7 +332,7 @@ using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
- ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
+ ARROW_CHECK_OK(client.Connect("/tmp/plasma"));
ObjectID object_id = ObjectID::from_binary("00000000000000000000");
ObjectBuffer object_buffer;
ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer));
@@ -421,7 +421,7 @@ using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
- ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
+ ARROW_CHECK_OK(client.Connect("/tmp/plasma"));
int fd;
ARROW_CHECK_OK(client.Subscribe(&fd));
diff --git a/cpp/apidoc/tutorials/tensor_to_py.md b/cpp/apidoc/tutorials/tensor_to_py.md
index 0be973a..cd191fe 100644
--- a/cpp/apidoc/tutorials/tensor_to_py.md
+++ b/cpp/apidoc/tutorials/tensor_to_py.md
@@ -105,7 +105,7 @@ The `inputs` variable will be a list of Object IDs in their raw byte string form
import pyarrow as pa
import pyarrow.plasma as plasma
-plasma_client = plasma.connect('/tmp/plasma', '', 0)
+plasma_client = plasma.connect('/tmp/plasma')
# inputs: a list of object ids
inputs = [20 * b'1']
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 2dbe2b4..4215399 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -198,17 +198,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status Disconnect();
- Status Fetch(int num_object_ids, const ObjectID* object_ids);
-
- Status Wait(int64_t num_object_requests, ObjectRequest* object_requests,
- int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
-
- Status Transfer(const char* addr, int port, const ObjectID& object_id);
-
- Status Info(const ObjectID& object_id, int* object_status);
-
- int get_manager_fd() const;
-
bool IsInUse(const ObjectID& object_id);
private:
@@ -250,8 +239,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
/// File descriptor of the Unix domain socket that connects to the store.
int store_conn_;
- /// File descriptor of the Unix domain socket that connects to the manager.
- int manager_conn_;
/// Table of dlmalloc buffer files that have been memory mapped so far. This
/// is a hash table mapping a file descriptor to a struct containing the
/// address of the corresponding memory-mapped file.
@@ -872,10 +859,7 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
int release_delay, int num_retries) {
RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
if (manager_socket_name != "") {
- RETURN_NOT_OK(
- ConnectIpcSocketRetry(manager_socket_name, num_retries, -1, &manager_conn_));
- } else {
- manager_conn_ = -1;
+ return Status::NotImplemented("plasma manager is no longer supported");
}
if (release_delay != 0) {
ARROW_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect "
@@ -898,78 +882,6 @@ Status PlasmaClient::Impl::Disconnect() {
// that were in use by us when handling the SIGPIPE.
close(store_conn_);
store_conn_ = -1;
- if (manager_conn_ >= 0) {
- close(manager_conn_);
- manager_conn_ = -1;
- }
- return Status::OK();
-}
-
-Status PlasmaClient::Impl::Transfer(const char* address, int port,
- const ObjectID& object_id) {
- return SendDataRequest(manager_conn_, object_id, address, port);
-}
-
-Status PlasmaClient::Impl::Fetch(int num_object_ids, const ObjectID* object_ids) {
- ARROW_CHECK(manager_conn_ >= 0);
- return SendFetchRequest(manager_conn_, object_ids, num_object_ids);
-}
-
-int PlasmaClient::Impl::get_manager_fd() const { return manager_conn_; }
-
-Status PlasmaClient::Impl::Info(const ObjectID& object_id, int* object_status) {
- ARROW_CHECK(manager_conn_ >= 0);
-
- RETURN_NOT_OK(SendStatusRequest(manager_conn_, &object_id, 1));
- std::vector<uint8_t> buffer;
- RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType::PlasmaStatusReply, &buffer));
- ObjectID id;
- RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1));
- ARROW_CHECK(object_id == id);
- return Status::OK();
-}
-
-Status PlasmaClient::Impl::Wait(int64_t num_object_requests,
- ObjectRequest* object_requests, int num_ready_objects,
- int64_t timeout_ms, int* num_objects_ready) {
- ARROW_CHECK(manager_conn_ >= 0);
- ARROW_CHECK(num_object_requests > 0);
- ARROW_CHECK(num_ready_objects > 0);
- ARROW_CHECK(num_ready_objects <= num_object_requests);
-
- for (int i = 0; i < num_object_requests; ++i) {
- ARROW_CHECK(object_requests[i].type == ObjectRequestType::PLASMA_QUERY_LOCAL ||
- object_requests[i].type == ObjectRequestType::PLASMA_QUERY_ANYWHERE);
- }
-
- RETURN_NOT_OK(SendWaitRequest(manager_conn_, object_requests, num_object_requests,
- num_ready_objects, timeout_ms));
- std::vector<uint8_t> buffer;
- RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType::PlasmaWaitReply, &buffer));
- RETURN_NOT_OK(
- ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects));
-
- *num_objects_ready = 0;
- for (int i = 0; i < num_object_requests; ++i) {
- ObjectRequestType type = object_requests[i].type;
- auto status = static_cast<fb::ObjectStatus>(object_requests[i].location);
- switch (type) {
- case ObjectRequestType::PLASMA_QUERY_LOCAL:
- if (status == fb::ObjectStatus::Local) {
- *num_objects_ready += 1;
- }
- break;
- case ObjectRequestType::PLASMA_QUERY_ANYWHERE:
- if (status == fb::ObjectStatus::Local || status == fb::ObjectStatus::Remote) {
- *num_objects_ready += 1;
- } else {
- ARROW_CHECK(status == fb::ObjectStatus::Nonexistent);
- }
- break;
- default:
- ARROW_LOG(FATAL) << "This code should be unreachable.";
- }
- }
return Status::OK();
}
@@ -1052,27 +964,6 @@ Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_
Status PlasmaClient::Disconnect() { return impl_->Disconnect(); }
-Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) {
- return impl_->Fetch(num_object_ids, object_ids);
-}
-
-Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_requests,
- int num_ready_objects, int64_t timeout_ms,
- int* num_objects_ready) {
- return impl_->Wait(num_object_requests, object_requests, num_ready_objects, timeout_ms,
- num_objects_ready);
-}
-
-Status PlasmaClient::Transfer(const char* addr, int port, const ObjectID& object_id) {
- return impl_->Transfer(addr, port, object_id);
-}
-
-Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
- return impl_->Info(object_id, object_status);
-}
-
-int PlasmaClient::get_manager_fd() const { return impl_->get_manager_fd(); }
-
bool PlasmaClient::IsInUse(const ObjectID& object_id) {
return impl_->IsInUse(object_id);
}
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 514d2bd..ac9e8eb 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -49,19 +49,20 @@ class ARROW_EXPORT PlasmaClient {
PlasmaClient();
~PlasmaClient();
- /// Connect to the local plasma store and plasma manager. Return
- /// the resulting connection.
+ /// Connect to the local plasma store. Return the resulting connection.
///
/// \param store_socket_name The name of the UNIX domain socket to use to
/// connect to the Plasma store.
/// \param manager_socket_name The name of the UNIX domain socket to use to
/// connect to the local Plasma manager. If this is "", then this
/// function will not connect to a manager.
+ /// Note that plasma manager is no longer supported, this function
+ /// will return failure if this is not "".
/// \param release_delay Deprecated (not used).
/// \param num_retries number of attempts to connect to IPC socket, default 50
/// \return The return status.
Status Connect(const std::string& store_socket_name,
- const std::string& manager_socket_name, int release_delay = 0,
+ const std::string& manager_socket_name = "", int release_delay = 0,
int num_retries = -1);
/// Create an object in the Plasma Store. Any metadata for this object must be
@@ -249,99 +250,6 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status.
Status Disconnect();
- /// Attempt to initiate the transfer of some objects from remote Plasma
- /// Stores.
- /// This method does not guarantee that the fetched objects will arrive
- /// locally.
- ///
- /// For an object that is available in the local Plasma Store, this method
- /// will
- /// not do anything. For an object that is not available locally, it will
- /// check
- /// if the object are already being fetched. If so, it will not do anything.
- /// If
- /// not, it will query the object table for a list of Plasma Managers that
- /// have
- /// the object. The object table will return a non-empty list, and this Plasma
- /// Manager will attempt to initiate transfers from one of those Plasma
- /// Managers.
- ///
- /// This function is non-blocking.
- ///
- /// This method is idempotent in the sense that it is ok to call it multiple
- /// times.
- ///
- /// \param num_object_ids The number of object IDs fetch is being called on.
- /// \param object_ids The IDs of the objects that fetch is being called on.
- /// \return The return status.
- Status Fetch(int num_object_ids, const ObjectID* object_ids);
-
- /// Wait for (1) a specified number of objects to be available (sealed) in the
- /// local Plasma Store or in a remote Plasma Store, or (2) for a timeout to
- /// expire. This is a blocking call.
- ///
- /// \param num_object_requests Size of the object_requests array.
- /// \param object_requests Object event array. Each element contains a request
- /// for a particular object_id. The type of request is specified in the
- /// "type" field.
- /// - A PLASMA_QUERY_LOCAL request is satisfied when object_id becomes
- /// available in the local Plasma Store. In this case, this function
- /// sets the "status" field to ObjectStatus::Local. Note, if the
- /// status
- /// is not ObjectStatus::Local, it will be ObjectStatus::Nonexistent,
- /// but it may exist elsewhere in the system.
- /// - A PLASMA_QUERY_ANYWHERE request is satisfied when object_id
- /// becomes
- /// available either at the local Plasma Store or on a remote Plasma
- /// Store. In this case, the functions sets the "status" field to
- /// ObjectStatus::Local or ObjectStatus::Remote.
- /// \param num_ready_objects The number of requests in object_requests array
- /// that
- /// must be satisfied before the function returns, unless it timeouts.
- /// The num_ready_objects should be no larger than num_object_requests.
- /// \param timeout_ms Timeout value in milliseconds. If this timeout expires
- /// before min_num_ready_objects of requests are satisfied, the
- /// function
- /// returns.
- /// \param num_objects_ready Out parameter for number of satisfied requests in
- /// the object_requests list. If the returned number is less than
- /// min_num_ready_objects this means that timeout expired.
- /// \return The return status.
- Status Wait(int64_t num_object_requests, ObjectRequest* object_requests,
- int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
-
- /// Transfer local object to a different plasma manager.
- ///
- /// \param addr IP address of the plasma manager we are transfering to.
- /// \param port Port of the plasma manager we are transfering to.
- /// \param object_id ObjectID of the object we are transfering.
- /// \return The return status.
- Status Transfer(const char* addr, int port, const ObjectID& object_id);
-
- /// Return the status of a given object. This method may query the object
- /// table.
- ///
- /// \param object_id The ID of the object whose status we query.
- /// \param object_status Out parameter for object status. Can take the
- /// following values.
- /// - PLASMA_CLIENT_LOCAL, if object is stored in the local Plasma
- /// Store.
- /// has been already scheduled by the Plasma Manager.
- /// - PLASMA_CLIENT_TRANSFER, if the object is either currently being
- /// transferred or just scheduled.
- /// - PLASMA_CLIENT_REMOTE, if the object is stored at a remote
- /// Plasma Store.
- /// - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in the
- /// system.
- /// \return The return status.
- Status Info(const ObjectID& object_id, int* object_status);
-
- /// Get the file descriptor for the socket connection to the plasma manager.
- ///
- /// \return The file descriptor for the manager connection. If there is no
- /// connection to the manager, this is -1.
- int get_manager_fd() const;
-
private:
friend class PlasmaBuffer;
FRIEND_TEST(TestPlasmaStore, GetTest);
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index 0ca17cf..1b86fd8 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -107,9 +107,6 @@ bool UniqueID::operator==(const UniqueID& rhs) const {
return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
}
-ARROW_EXPORT fb::ObjectStatus ObjectStatusLocal = fb::ObjectStatus::Local;
-ARROW_EXPORT fb::ObjectStatus ObjectStatusRemote = fb::ObjectStatus::Remote;
-
const PlasmaStoreInfo* plasma_config;
} // namespace plasma
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 7090428..38925fe 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -66,30 +66,6 @@ typedef UniqueID ObjectID;
/// Size of object hash digests.
constexpr int64_t kDigestSize = sizeof(uint64_t);
-enum class ObjectRequestType : int {
- /// Query for object in the local plasma store.
- PLASMA_QUERY_LOCAL = 1,
- /// Query for object in the local plasma store or in a remote plasma store.
- PLASMA_QUERY_ANYWHERE
-};
-
-/// Object request data structure. Used for Wait.
-struct ObjectRequest {
- /// The ID of the requested object. If ID_NIL request any object.
- ObjectID object_id;
- /// Request associated to the object. It can take one of the following values:
- /// - PLASMA_QUERY_LOCAL: return if or when the object is available in the
- /// local Plasma Store.
- /// - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
- /// the system (i.e., either in the local or a remote Plasma Store).
- ObjectRequestType type;
- /// Object location. This can be
- /// - ObjectLocation::Local: object is ready at the local Plasma Store.
- /// - ObjectLocation::Remote: object is ready at a remote Plasma Store.
- /// - ObjectLocation::Nonexistent: object does not exist in the system.
- ObjectLocation location;
-};
-
enum class ObjectState : int {
/// Object was created but not sealed in the local Plasma Store.
PLASMA_CREATED = 1,
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index ef934fb..b3c8903 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -42,9 +42,6 @@ enum MessageType:long {
// Delete an object.
PlasmaDeleteRequest,
PlasmaDeleteReply,
- // Get status of an object.
- PlasmaStatusRequest,
- PlasmaStatusReply,
// See if the store contains an object (will be deprecated).
PlasmaContainsRequest,
PlasmaContainsReply,
@@ -57,11 +54,6 @@ enum MessageType:long {
// Make room for new objects in the plasma store.
PlasmaEvictRequest,
PlasmaEvictReply,
- // Fetch objects from remote Plasma stores.
- PlasmaFetchRequest,
- // Wait for objects to be ready either from local or remote Plasma stores.
- PlasmaWaitRequest,
- PlasmaWaitReply,
// Subscribe to a list of objects or to all objects.
PlasmaSubscribeRequest,
// Unsubscribe.
@@ -239,35 +231,6 @@ table PlasmaDeleteReply {
errors: [PlasmaError];
}
-table PlasmaStatusRequest {
- // IDs of the objects stored at local Plasma store we request the status of.
- object_ids: [string];
-}
-
-enum ObjectStatus:int {
- // Object is stored in the local Plasma Store.
- Local,
- // Object is stored on a remote Plasma store, and it is not stored on the
- // local Plasma Store.
- Remote,
- // Object is not stored in the system.
- Nonexistent,
- // Object is currently transferred from a remote Plasma store the local
- // Plasma Store.
- Transfer
-}
-
-table PlasmaStatusReply {
- // IDs of the objects being returned.
- object_ids: [string];
- // Status of the object.
- status: [ObjectStatus];
-}
-
-// PlasmaContains is a subset of PlasmaStatus which does not
-// involve the plasma manager, only the store. We should consider
-// unifying them in the future and deprecating PlasmaContains.
-
table PlasmaContainsRequest {
// ID of the object we are querying.
object_id: string;
@@ -309,43 +272,6 @@ table PlasmaEvictReply {
num_bytes: ulong;
}
-table PlasmaFetchRequest {
- // IDs of objects to be gotten.
- object_ids: [string];
-}
-
-table ObjectRequestSpec {
- // ID of the object.
- object_id: string;
- // The type of the object. This specifies whether we
- // will be waiting for an object store in the local or
- // global Plasma store.
- type: int;
-}
-
-table PlasmaWaitRequest {
- // Array of object requests whose status we are asking for.
- object_requests: [ObjectRequestSpec];
- // Number of objects expected to be returned, if available.
- num_ready_objects: int;
- // timeout
- timeout: long;
-}
-
-table ObjectReply {
- // ID of the object.
- object_id: string;
- // The object status. This specifies where the object is stored.
- status: ObjectStatus;
-}
-
-table PlasmaWaitReply {
- // Array of object requests being returned.
- object_requests: [ObjectReply];
- // Number of objects expected to be returned, if available.
- num_ready_objects: int;
-}
-
table PlasmaSubscribeRequest {
}
diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
index 7cd2f35..fa376ec 100644
--- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
+++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
@@ -220,79 +220,6 @@ JNIEXPORT jboolean JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_contains
return has_object;
}
-JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_fetch(
- JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids) {
- plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);
- jsize num_oids = env->GetArrayLength(object_ids);
-
- std::vector<plasma::ObjectID> oids(num_oids);
- for (int i = 0; i < num_oids; ++i) {
- jbyteArray_to_object_id(
- env, reinterpret_cast<jbyteArray>(env->GetObjectArrayElement(object_ids, i)),
- &oids[i]);
- }
-
- ARROW_CHECK_OK(client->Fetch(static_cast<int>(num_oids), oids.data()));
-
- return;
-}
-
-JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_wait(
- JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids, jint timeout_ms,
- jint num_returns) {
- plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);
- jsize num_oids = env->GetArrayLength(object_ids);
-
- if (num_returns < 0) {
- jclass Exception = env->FindClass("java/lang/RuntimeException");
- env->ThrowNew(Exception, "The argument num_returns cannot be less than zero.");
- return nullptr;
- }
- if (num_returns > num_oids) {
- jclass Exception = env->FindClass("java/lang/RuntimeException");
- env->ThrowNew(Exception,
- "The argument num_returns cannot be greater than len(object_ids).");
- return nullptr;
- }
-
- std::vector<plasma::ObjectRequest> oreqs(num_oids);
-
- for (int i = 0; i < num_oids; ++i) {
- jbyteArray_to_object_id(
- env, reinterpret_cast<jbyteArray>(env->GetObjectArrayElement(object_ids, i)),
- &oreqs[i].object_id);
- oreqs[i].type = plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE;
- }
-
- int num_return_objects;
- // TODO: may be blocked. consider to add the thread support
- ARROW_CHECK_OK(client->Wait(static_cast<int>(num_oids), oreqs.data(), num_returns,
- static_cast<uint64_t>(timeout_ms), &num_return_objects));
-
- int num_to_return = std::min(num_return_objects, num_returns);
- jclass clsByteArray = env->FindClass("[B");
- jobjectArray ret = env->NewObjectArray(num_to_return, clsByteArray, nullptr);
-
- int num_returned = 0;
- jbyteArray oid = nullptr;
- for (int i = 0; i < num_oids; ++i) {
- if (num_returned >= num_to_return) {
- break;
- }
-
- if (oreqs[i].location == plasma::ObjectLocation::Local ||
- oreqs[i].location == plasma::ObjectLocation::Remote) {
- oid = env->NewByteArray(OBJECT_ID_SIZE);
- object_id_to_jbyteArray(env, oid, &oreqs[i].object_id);
- env->SetObjectArrayElement(ret, num_returned, oid);
- num_returned++;
- }
- }
- ARROW_CHECK(num_returned == num_to_return);
-
- return ret;
-}
-
JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_evict(
JNIEnv* env, jclass cls, jlong conn, jlong num_bytes) {
plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 83caec7..aafe527 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -68,9 +68,6 @@ constexpr int64_t kBlockSize = 64;
struct Client;
-/// Mapping from object IDs to type and status of the request.
-typedef std::unordered_map<ObjectID, ObjectRequest> ObjectRequestMap;
-
// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
struct PlasmaObject {
#ifdef PLASMA_CUDA
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index c437840..a878647 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -42,10 +42,6 @@ using flatbuffers::uoffset_t;
#define PLASMA_CHECK_ENUM(x, y) \
static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch")
-PLASMA_CHECK_ENUM(ObjectLocation::Local, fb::ObjectStatus::Local);
-PLASMA_CHECK_ENUM(ObjectLocation::Remote, fb::ObjectStatus::Remote);
-PLASMA_CHECK_ENUM(ObjectLocation::Nonexistent, fb::ObjectStatus::Nonexistent);
-
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
int64_t num_objects) {
@@ -367,56 +363,6 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object
return Status::OK();
}
-// Satus messages.
-
-Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message =
- fb::CreatePlasmaStatusRequest(fbb, ToFlatbuffer(&fbb, object_ids, num_objects));
- return PlasmaSend(sock, MessageType::PlasmaStatusRequest, &fbb, message);
-}
-
-Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
- int64_t num_objects) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<fb::PlasmaStatusRequest>(data);
- DCHECK(VerifyFlatbuffer(message, data, size));
- for (uoffset_t i = 0; i < num_objects; ++i) {
- object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
- }
- return Status::OK();
-}
-
-Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
- int64_t num_objects) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message =
- fb::CreatePlasmaStatusReply(fbb, ToFlatbuffer(&fbb, object_ids, num_objects),
- fbb.CreateVector(object_status, num_objects));
- return PlasmaSend(sock, MessageType::PlasmaStatusReply, &fbb, message);
-}
-
-int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<fb::PlasmaStatusReply>(data);
- DCHECK(VerifyFlatbuffer(message, data, size));
- return message->object_ids()->size();
-}
-
-Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
- int object_status[], int64_t num_objects) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<fb::PlasmaStatusReply>(data);
- DCHECK(VerifyFlatbuffer(message, data, size));
- for (uoffset_t i = 0; i < num_objects; ++i) {
- object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
- }
- for (uoffset_t i = 0; i < num_objects; ++i) {
- object_status[i] = message->status()->data()[i];
- }
- return Status::OK();
-}
-
// Contains messages.
Status SendContainsRequest(int sock, ObjectID object_id) {
@@ -640,95 +586,6 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
}
return Status::OK();
}
-// Fetch messages.
-
-Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message =
- fb::CreatePlasmaFetchRequest(fbb, ToFlatbuffer(&fbb, object_ids, num_objects));
- return PlasmaSend(sock, MessageType::PlasmaFetchRequest, &fbb, message);
-}
-
-Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<fb::PlasmaFetchRequest>(data);
- DCHECK(VerifyFlatbuffer(message, data, size));
- for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
- object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
- }
- return Status::OK();
-}
-
-// Wait messages.
-
-Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
- int num_ready_objects, int64_t timeout_ms) {
- flatbuffers::FlatBufferBuilder fbb;
-
- std::vector<flatbuffers::Offset<fb::ObjectRequestSpec>> object_request_specs;
- for (int i = 0; i < num_requests; i++) {
- object_request_specs.push_back(fb::CreateObjectRequestSpec(
- fbb, fbb.CreateString(object_requests[i].object_id.binary()),
- static_cast<int>(object_requests[i].type)));
- }
-
- auto message = fb::CreatePlasmaWaitRequest(fbb, fbb.CreateVector(object_request_specs),
- num_ready_objects, timeout_ms);
- return PlasmaSend(sock, MessageType::PlasmaWaitRequest, &fbb, message);
-}
-
-Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
- int64_t* timeout_ms, int* num_ready_objects) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<fb::PlasmaWaitRequest>(data);
- DCHECK(VerifyFlatbuffer(message, data, size));
- *num_ready_objects = message->num_ready_objects();
- *timeout_ms = message->timeout();
-
- for (uoffset_t i = 0; i < message->object_requests()->size(); i++) {
- ObjectID object_id =
- ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
- ObjectRequest object_request(
- {object_id,
- static_cast<ObjectRequestType>(message->object_requests()->Get(i)->type()),
- ObjectLocation::Nonexistent});
- object_requests[object_id] = object_request;
- }
- return Status::OK();
-}
-
-Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
- int num_ready_objects) {
- flatbuffers::FlatBufferBuilder fbb;
-
- std::vector<flatbuffers::Offset<fb::ObjectReply>> object_replies;
- for (const auto& entry : object_requests) {
- const auto& object_request = entry.second;
- object_replies.push_back(
- fb::CreateObjectReply(fbb, fbb.CreateString(object_request.object_id.binary()),
- static_cast<fb::ObjectStatus>(object_request.location)));
- }
-
- auto message = fb::CreatePlasmaWaitReply(
- fbb, fbb.CreateVector(object_replies.data(), num_ready_objects), num_ready_objects);
- return PlasmaSend(sock, MessageType::PlasmaWaitReply, &fbb, message);
-}
-
-Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
- int* num_ready_objects) {
- DCHECK(data);
-
- auto message = flatbuffers::GetRoot<fb::PlasmaWaitReply>(data);
- DCHECK(VerifyFlatbuffer(message, data, size));
- *num_ready_objects = message->num_ready_objects();
- for (int i = 0; i < *num_ready_objects; i++) {
- object_requests[i].object_id =
- ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
- object_requests[i].location =
- static_cast<ObjectLocation>(message->object_requests()->Get(i)->status());
- }
- return Status::OK();
-}
// Subscribe messages.
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index c820458..0362bd4 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -128,21 +128,6 @@ Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
std::vector<PlasmaError>* errors);
-/* Satus messages. */
-
-Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
-
-Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
- int64_t num_objects);
-
-Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
- int64_t num_objects);
-
-int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size);
-
-Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
- int object_status[], int64_t num_objects);
-
/* Plasma Constains message functions. */
Status SendContainsRequest(int sock, ObjectID object_id);
@@ -184,26 +169,6 @@ Status SendEvictReply(int sock, int64_t num_bytes);
Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes);
-/* Plasma Fetch Remote message functions. */
-
-Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
-
-Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids);
-
-/* Plasma Wait message functions. */
-
-Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
- int num_ready_objects, int64_t timeout_ms);
-
-Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
- int64_t* timeout_ms, int* num_ready_objects);
-
-Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
- int num_ready_objects);
-
-Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
- int* num_ready_objects);
-
/* Plasma Subscribe message functions. */
Status SendSubscribeRequest(int sock);
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 65a9b71..30dc685 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -187,7 +187,6 @@ TEST_F(TestPlasmaStore, DeleteTest) {
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
ASSERT_TRUE(has_object);
- // Avoid race condition of Plasma Manager waiting for notification.
ARROW_CHECK_OK(client_.Release(object_id));
// object_id is marked as to-be-deleted, when it is not in use, it will be deleted.
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
@@ -251,7 +250,6 @@ TEST_F(TestPlasmaStore, ContainsTest) {
// First create object.
std::vector<uint8_t> data(100, 0);
CreateObject(client_, object_id, {42}, data);
- // Avoid race condition of Plasma Manager waiting for notification.
std::vector<ObjectBuffer> object_buffers;
ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 085ae97..66d651d 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -254,44 +254,6 @@ TEST(PlasmaSerialization, DeleteReply) {
close(fd);
}
-TEST(PlasmaSerialization, StatusRequest) {
- int fd = create_temp_file();
- constexpr int64_t num_objects = 2;
- ObjectID object_ids[num_objects];
- object_ids[0] = random_object_id();
- object_ids[1] = random_object_id();
- ARROW_CHECK_OK(SendStatusRequest(fd, object_ids, num_objects));
- std::vector<uint8_t> data =
- read_message_from_file(fd, MessageType::PlasmaStatusRequest);
- ObjectID object_ids_read[num_objects];
- ARROW_CHECK_OK(
- ReadStatusRequest(data.data(), data.size(), object_ids_read, num_objects));
- ASSERT_EQ(object_ids[0], object_ids_read[0]);
- ASSERT_EQ(object_ids[1], object_ids_read[1]);
- close(fd);
-}
-
-TEST(PlasmaSerialization, StatusReply) {
- int fd = create_temp_file();
- ObjectID object_ids[2];
- object_ids[0] = random_object_id();
- object_ids[1] = random_object_id();
- int object_statuses[2] = {42, 43};
- ARROW_CHECK_OK(SendStatusReply(fd, object_ids, object_statuses, 2));
- std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaStatusReply);
- int64_t num_objects = ReadStatusReply_num_objects(data.data(), data.size());
-
- std::vector<ObjectID> object_ids_read(num_objects);
- std::vector<int> object_statuses_read(num_objects);
- ARROW_CHECK_OK(ReadStatusReply(data.data(), data.size(), object_ids_read.data(),
- object_statuses_read.data(), num_objects));
- ASSERT_EQ(object_ids[0], object_ids_read[0]);
- ASSERT_EQ(object_ids[1], object_ids_read[1]);
- ASSERT_EQ(object_statuses[0], object_statuses_read[0]);
- ASSERT_EQ(object_statuses[1], object_statuses_read[1]);
- close(fd);
-}
-
TEST(PlasmaSerialization, EvictRequest) {
int fd = create_temp_file();
int64_t num_bytes = 111;
@@ -314,84 +276,6 @@ TEST(PlasmaSerialization, EvictReply) {
close(fd);
}
-TEST(PlasmaSerialization, FetchRequest) {
- int fd = create_temp_file();
- ObjectID object_ids[2];
- object_ids[0] = random_object_id();
- object_ids[1] = random_object_id();
- ARROW_CHECK_OK(SendFetchRequest(fd, object_ids, 2));
- std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaFetchRequest);
- std::vector<ObjectID> object_ids_read;
- ARROW_CHECK_OK(ReadFetchRequest(data.data(), data.size(), object_ids_read));
- ASSERT_EQ(object_ids[0], object_ids_read[0]);
- ASSERT_EQ(object_ids[1], object_ids_read[1]);
- close(fd);
-}
-
-TEST(PlasmaSerialization, WaitRequest) {
- int fd = create_temp_file();
- const int num_objects_in = 2;
- ObjectRequest object_requests_in[num_objects_in] = {
- ObjectRequest({random_object_id(), ObjectRequestType::PLASMA_QUERY_ANYWHERE,
- ObjectLocation::Local}),
- ObjectRequest({random_object_id(), ObjectRequestType::PLASMA_QUERY_LOCAL,
- ObjectLocation::Local})};
- const int num_ready_objects_in = 1;
- int64_t timeout_ms = 1000;
-
- ARROW_CHECK_OK(SendWaitRequest(fd, &object_requests_in[0], num_objects_in,
- num_ready_objects_in, timeout_ms));
- /* Read message back. */
- std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaWaitRequest);
- int num_ready_objects_out;
- int64_t timeout_ms_read;
- ObjectRequestMap object_requests_out;
- ARROW_CHECK_OK(ReadWaitRequest(data.data(), data.size(), object_requests_out,
- &timeout_ms_read, &num_ready_objects_out));
- ASSERT_EQ(num_objects_in, object_requests_out.size());
- ASSERT_EQ(num_ready_objects_out, num_ready_objects_in);
- for (int i = 0; i < num_objects_in; i++) {
- const ObjectID& object_id = object_requests_in[i].object_id;
- ASSERT_EQ(1, object_requests_out.count(object_id));
- const auto& entry = object_requests_out.find(object_id);
- ASSERT_TRUE(entry != object_requests_out.end());
- ASSERT_EQ(entry->second.object_id, object_requests_in[i].object_id);
- ASSERT_EQ(entry->second.type, object_requests_in[i].type);
- }
- close(fd);
-}
-
-TEST(PlasmaSerialization, WaitReply) {
- int fd = create_temp_file();
- const int num_objects_in = 2;
- /* Create a map with two ObjectRequests in it. */
- ObjectRequestMap objects_in(num_objects_in);
- ObjectID id1 = random_object_id();
- objects_in[id1] =
- ObjectRequest({id1, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectLocation::Local});
- ObjectID id2 = random_object_id();
- objects_in[id2] = ObjectRequest(
- {id2, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectLocation::Nonexistent});
-
- ARROW_CHECK_OK(SendWaitReply(fd, objects_in, num_objects_in));
- /* Read message back. */
- std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaWaitReply);
- ObjectRequest objects_out[2];
- int num_objects_out;
- ARROW_CHECK_OK(
- ReadWaitReply(data.data(), data.size(), &objects_out[0], &num_objects_out));
- ASSERT_EQ(num_objects_in, num_objects_out);
- for (int i = 0; i < num_objects_out; i++) {
- /* Each object request must appear exactly once. */
- ASSERT_EQ(objects_in.count(objects_out[i].object_id), 1);
- const auto& entry = objects_in.find(objects_out[i].object_id);
- ASSERT_TRUE(entry != objects_in.end());
- ASSERT_EQ(entry->second.object_id, objects_out[i].object_id);
- ASSERT_EQ(entry->second.location, objects_out[i].location);
- }
- close(fd);
-}
-
TEST(PlasmaSerialization, DataRequest) {
int fd = create_temp_file();
ObjectID object_id1 = random_object_id();
diff --git a/docs/source/python/plasma.rst b/docs/source/python/plasma.rst
index 3df68ef..660c5fb 100644
--- a/docs/source/python/plasma.rst
+++ b/docs/source/python/plasma.rst
@@ -60,7 +60,7 @@ socket name:
.. code-block:: python
import pyarrow.plasma as plasma
- client = plasma.connect("/tmp/plasma", "")
+ client = plasma.connect("/tmp/plasma")
If the following error occurs from running the above Python code, that
means that either the socket given is incorrect, or the ``./plasma_store`` is
@@ -68,7 +68,7 @@ not currently running. Check to see if the Plasma store is still running.
.. code-block:: shell
- >>> client = plasma.connect("/tmp/plasma", "")
+ >>> client = plasma.connect("/tmp/plasma")
Connection to socket failed for pathname /tmp/plasma
Could not connect to socket /tmp/plasma
@@ -179,7 +179,7 @@ the object buffer.
# Create a different client. Note that this second client could be
# created in the same or in a separate, concurrent Python session.
- client2 = plasma.connect("/tmp/plasma", "")
+ client2 = plasma.connect("/tmp/plasma")
# Get the object in the second client. This blocks until the object has been sealed.
object_id2 = plasma.ObjectID(20 * b"a")
@@ -221,7 +221,7 @@ of the object info might change in the future):
import pyarrow.plasma as plasma
import time
- client = plasma.connect("/tmp/plasma", "")
+ client = plasma.connect("/tmp/plasma")
client.put("hello, world")
# Sleep a little so we get different creation times
@@ -452,7 +452,7 @@ You can test this with the following script:
import pyarrow.plasma as plasma
import time
- client = plasma.connect("/tmp/plasma", "")
+ client = plasma.connect("/tmp/plasma")
data = np.random.randn(100000000)
tensor = pa.Tensor.from_numpy(data)
diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
index 3b67bc0..8d6eec0 100644
--- a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
+++ b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
@@ -80,16 +80,6 @@ public interface ObjectStoreLink {
List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs);
/**
- * Wait until <tt>numReturns</tt> objects in <tt>objectIds</tt> are ready.
- *
- * @param objectIds List of object IDs to wait for.
- * @param timeoutMs Return to the caller after <tt>timeoutMs</tt> milliseconds.
- * @param numReturns We are waiting for this number of objects to be ready.
- * @return List of object IDs that are ready
- */
- List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns);
-
- /**
* Compute the hash of an object in the object store.
*
* @param objectId The object ID used to identify the object.
@@ -99,23 +89,6 @@ public interface ObjectStoreLink {
byte[] hash(byte[] objectId);
/**
- * Fetch the object with the given ID from other plasma manager instances.
- *
- * @param objectId The object ID used to identify the object.
- */
- default void fetch(byte[] objectId) {
- byte[][] objectIds = {objectId};
- fetch(objectIds);
- }
-
- /**
- * Fetch the objects with the given IDs from other plasma manager instances.
- *
- * @param objectIds List of object IDs used to identify the objects.
- */
- void fetch(byte[][] objectIds);
-
- /**
* Evict some objects to recover given count of bytes.
*
* @param numBytes The number of bytes to attempt to recover.
diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
index db1f35e..d69b54d 100644
--- a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
+++ b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
@@ -82,34 +82,11 @@ public class PlasmaClient implements ObjectStoreLink {
}
@Override
- public List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns) {
- byte[][] readys = PlasmaClientJNI.wait(conn, objectIds, timeoutMs, numReturns);
-
- List<byte[]> ret = new ArrayList<>();
- for (byte[] ready : readys) {
- for (byte[] id : objectIds) {
- if (Arrays.equals(ready, id)) {
- ret.add(id);
- break;
- }
- }
- }
-
- assert (ret.size() == readys.length);
- return ret;
- }
-
- @Override
public byte[] hash(byte[] objectId) {
return PlasmaClientJNI.hash(conn, objectId);
}
@Override
- public void fetch(byte[][] objectIds) {
- PlasmaClientJNI.fetch(conn, objectIds);
- }
-
- @Override
public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs);
assert bufs.length == objectIds.length;
diff --git a/python/benchmarks/plasma.py b/python/benchmarks/plasma.py
index 7cefcdf..398ec72 100644
--- a/python/benchmarks/plasma.py
+++ b/python/benchmarks/plasma.py
@@ -32,7 +32,7 @@ class SimplePlasmaThroughput(object):
self.plasma_store_ctx = plasma.start_plasma_store(
plasma_store_memory=10**9)
plasma_store_name, p = self.plasma_store_ctx.__enter__()
- self.plasma_client = plasma.connect(plasma_store_name, "", 64)
+ self.plasma_client = plasma.connect(plasma_store_name)
self.data = np.random.randn(size // 8)
@@ -52,7 +52,7 @@ class SimplePlasmaLatency(object):
self.plasma_store_ctx = plasma.start_plasma_store(
plasma_store_memory=10**9)
plasma_store_name, p = self.plasma_store_ctx.__enter__()
- self.plasma_client = plasma.connect(plasma_store_name, "", 64)
+ self.plasma_client = plasma.connect(plasma_store_name)
def teardown(self):
self.plasma_store_ctx.__exit__(None, None, None)
diff --git a/python/examples/plasma/sorting/sort_df.py b/python/examples/plasma/sorting/sort_df.py
index 2e4df58..2a51759 100644
--- a/python/examples/plasma/sorting/sort_df.py
+++ b/python/examples/plasma/sorting/sort_df.py
@@ -49,7 +49,7 @@ column_to_sort = column_names[0]
# Connect to clients
def connect():
global client
- client = plasma.connect('/tmp/store', '', 0)
+ client = plasma.connect('/tmp/store')
np.random.seed(int(time.time() * 10e7) % 10000000)
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index f7db3b4..cfaa39c 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -63,11 +63,6 @@ cdef extern from "plasma/common.h" nogil:
@staticmethod
int64_t size()
- cdef struct CObjectRequest" plasma::ObjectRequest":
- CUniqueID object_id
- int type
- int location
-
cdef enum CObjectState" plasma::ObjectState":
PLASMA_CREATED" plasma::ObjectState::PLASMA_CREATED"
PLASMA_SEALED" plasma::ObjectState::PLASMA_SEALED"
@@ -92,14 +87,6 @@ cdef extern from "plasma/common.h" nogil:
cdef extern from "plasma/common.h":
cdef int64_t kDigestSize" plasma::kDigestSize"
- cdef enum ObjectRequestType:
- PLASMA_QUERY_LOCAL"plasma::ObjectRequestType::PLASMA_QUERY_LOCAL",
- PLASMA_QUERY_ANYWHERE"plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE"
-
- cdef enum ObjectLocation:
- ObjectStatusLocal"plasma::ObjectLocation::Local"
- ObjectStatusRemote"plasma::ObjectLocation::Remote"
-
cdef extern from "plasma/client.h" nogil:
cdef cppclass CPlasmaClient" plasma::PlasmaClient":
@@ -143,16 +130,6 @@ cdef extern from "plasma/client.h" nogil:
CStatus Disconnect()
- CStatus Fetch(int num_object_ids, const CUniqueID* object_ids)
-
- CStatus Wait(int64_t num_object_requests,
- CObjectRequest* object_requests,
- int num_ready_objects, int64_t timeout_ms,
- int* num_objects_ready)
-
- CStatus Transfer(const char* addr, int port,
- const CUniqueID& object_id)
-
CStatus Delete(const c_vector[CUniqueID] object_ids)
cdef extern from "plasma/client.h" nogil:
@@ -285,13 +262,11 @@ cdef class PlasmaClient:
shared_ptr[CPlasmaClient] client
int notification_fd
c_string store_socket_name
- c_string manager_socket_name
def __cinit__(self):
self.client.reset(new CPlasmaClient())
self.notification_fd = -1
self.store_socket_name = b""
- self.manager_socket_name = b""
cdef _get_object_buffers(self, object_ids, int64_t timeout_ms,
c_vector[CObjectBuffer]* result):
@@ -315,10 +290,6 @@ cdef class PlasmaClient:
def store_socket_name(self):
return self.store_socket_name.decode()
- @property
- def manager_socket_name(self):
- return self.manager_socket_name.decode()
-
def create(self, ObjectID object_id, int64_t data_size,
c_string metadata=b""):
"""
@@ -642,95 +613,6 @@ cdef class PlasmaClient:
check_status(self.client.get().Evict(num_bytes, num_bytes_evicted))
return num_bytes_evicted
- def transfer(self, address, int port, ObjectID object_id):
- """
- Transfer local object with id object_id to another plasma instance
-
- Parameters
- ----------
- addr : str
- IPv4 address of the plasma instance the object is sent to.
- port : int
- Port number of the plasma instance the object is sent to.
- object_id : str
- A string used to identify an object.
- """
- cdef c_string addr = address.encode()
- with nogil:
- check_status(self.client.get()
- .Transfer(addr.c_str(), port, object_id.data))
-
- def fetch(self, object_ids):
- """
- Fetch the objects with the given IDs from other plasma managers.
-
- Parameters
- ----------
- object_ids : list
- A list of strings used to identify the objects.
- """
- cdef c_vector[CUniqueID] ids
- cdef ObjectID object_id
- for object_id in object_ids:
- ids.push_back(object_id.data)
- with nogil:
- check_status(self.client.get().Fetch(ids.size(), ids.data()))
-
- def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT,
- int num_returns=1):
- """
- Wait until num_returns objects in object_ids are ready.
- Currently, the object ID arguments to wait must be unique.
-
- Parameters
- ----------
- object_ids : list
- List of object IDs to wait for.
- timeout :int
- Return to the caller after timeout milliseconds.
- num_returns : int
- We are waiting for this number of objects to be ready.
-
- Returns
- -------
- list
- List of object IDs that are ready.
- list
- List of object IDs we might still wait on.
- """
- # Check that the object ID arguments are unique. The plasma manager
- # currently crashes if given duplicate object IDs.
- if len(object_ids) != len(set(object_ids)):
- raise Exception("Wait requires a list of unique object IDs.")
- cdef int64_t num_object_requests = len(object_ids)
- cdef c_vector[CObjectRequest] object_requests = (
- c_vector[CObjectRequest](num_object_requests))
- cdef int num_objects_ready = 0
- cdef ObjectID object_id
- for i, object_id in enumerate(object_ids):
- object_requests[i].object_id = object_id.data
- object_requests[i].type = PLASMA_QUERY_ANYWHERE
- with nogil:
- check_status(self.client.get().Wait(num_object_requests,
- object_requests.data(),
- num_returns, timeout,
- &num_objects_ready))
- cdef int num_to_return = min(num_objects_ready, num_returns)
- ready_ids = []
- waiting_ids = set(object_ids)
- cdef int num_returned = 0
- for i in range(len(object_ids)):
- if num_returned == num_to_return:
- break
- if (object_requests[i].location == ObjectStatusLocal or
- object_requests[i].location == ObjectStatusRemote):
- ready_ids.append(
- ObjectID(object_requests[i].object_id.binary()))
- waiting_ids.discard(
- ObjectID(object_requests[i].object_id.binary()))
- num_returned += 1
- return ready_ids, list(waiting_ids)
-
def subscribe(self):
"""Subscribe to notifications about sealed objects."""
with nogil:
@@ -873,7 +755,7 @@ cdef class PlasmaClient:
return result
-def connect(store_socket_name, manager_socket_name, int release_delay=0,
+def connect(store_socket_name, manager_socket_name=None, int release_delay=0,
int num_retries=-1):
"""
Return a new PlasmaClient that is connected a plasma store and
@@ -884,22 +766,24 @@ def connect(store_socket_name, manager_socket_name, int release_delay=0,
store_socket_name : str
Name of the socket the plasma store is listening at.
manager_socket_name : str
- Name of the socket the plasma manager is listening at.
+ This parameter is deprecated and has no effect.
release_delay : int
This parameter is deprecated and has no effect.
num_retries : int, default -1
Number of times to try to connect to plasma store. Default value of -1
uses the default (50)
"""
+ if manager_socket_name is not None:
+ warnings.warn(
+ "manager_socket_name in PlasmaClient.connect is deprecated",
+ FutureWarning)
cdef PlasmaClient result = PlasmaClient()
result.store_socket_name = store_socket_name.encode()
- result.manager_socket_name = manager_socket_name.encode()
if release_delay != 0:
warnings.warn("release_delay in PlasmaClient.connect is deprecated",
FutureWarning)
with nogil:
check_status(result.client.get()
- .Connect(result.store_socket_name,
- result.manager_socket_name,
+ .Connect(result.store_socket_name, b"",
release_delay, num_retries))
return result
diff --git a/python/pyarrow/tensorflow/plasma_op.cc b/python/pyarrow/tensorflow/plasma_op.cc
index 4e6449a..852be33 100644
--- a/python/pyarrow/tensorflow/plasma_op.cc
+++ b/python/pyarrow/tensorflow/plasma_op.cc
@@ -71,13 +71,10 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
explicit TensorToPlasmaOp(tf::OpKernelConstruction* context) : tf::AsyncOpKernel(context) {
OP_REQUIRES_OK(context, context->GetAttr("plasma_store_socket_name",
&plasma_store_socket_name_));
- OP_REQUIRES_OK(context, context->GetAttr("plasma_manager_socket_name",
- &plasma_manager_socket_name_));
tf::mutex_lock lock(mu_);
if (!connected_) {
VLOG(1) << "Connecting to Plasma...";
- ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
- plasma_manager_socket_name_));
+ ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_));
VLOG(1) << "Connected!";
connected_ = true;
}
@@ -226,7 +223,6 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
private:
std::string plasma_store_socket_name_;
- std::string plasma_manager_socket_name_;
tf::mutex mu_;
bool connected_ = false;
@@ -243,13 +239,10 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
explicit PlasmaToTensorOp(tf::OpKernelConstruction* context) : tf::AsyncOpKernel(context) {
OP_REQUIRES_OK(context, context->GetAttr("plasma_store_socket_name",
&plasma_store_socket_name_));
- OP_REQUIRES_OK(context, context->GetAttr("plasma_manager_socket_name",
- &plasma_manager_socket_name_));
tf::mutex_lock lock(mu_);
if (!connected_) {
VLOG(1) << "Connecting to Plasma...";
- ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
- plasma_manager_socket_name_));
+ ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_));
VLOG(1) << "Connected!";
connected_ = true;
}
@@ -364,7 +357,6 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
private:
std::string plasma_store_socket_name_;
- std::string plasma_manager_socket_name_;
tf::mutex mu_;
bool connected_ = false;
@@ -375,8 +367,7 @@ REGISTER_OP("TensorToPlasma")
.Input("input_tensor: dtypes")
.Input("plasma_object_id: string")
.Attr("dtypes: list(type)")
- .Attr("plasma_store_socket_name: string")
- .Attr("plasma_manager_socket_name: string");
+ .Attr("plasma_store_socket_name: string");
REGISTER_KERNEL_BUILDER(Name("TensorToPlasma").Device(tf::DEVICE_CPU),
TensorToPlasmaOp<CPUDevice>);
@@ -389,8 +380,7 @@ REGISTER_OP("PlasmaToTensor")
.Input("plasma_object_id: string")
.Output("tensor: dtype")
.Attr("dtype: type")
- .Attr("plasma_store_socket_name: string")
- .Attr("plasma_manager_socket_name: string");
+ .Attr("plasma_store_socket_name: string");
REGISTER_KERNEL_BUILDER(Name("PlasmaToTensor").Device(tf::DEVICE_CPU),
PlasmaToTensorOp<CPUDevice>);
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 66449e6..05375d7 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -121,8 +121,8 @@ class TestPlasmaClient(object):
use_one_memory_mapped_file=use_one_memory_mapped_file)
self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
# Connect to Plasma.
- self.plasma_client = plasma.connect(self.plasma_store_name, "")
- self.plasma_client2 = plasma.connect(self.plasma_store_name, "")
+ self.plasma_client = plasma.connect(self.plasma_store_name)
+ self.plasma_client2 = plasma.connect(self.plasma_store_name)
def teardown_method(self, test_method):
try:
@@ -147,7 +147,7 @@ class TestPlasmaClient(object):
import pyarrow.plasma as plasma
# ARROW-1264
with pytest.raises(IOError):
- plasma.connect('unknown-store-name', '', 0, 1)
+ plasma.connect('unknown-store-name', num_retries=1)
def test_create(self):
# Create an object id string.
@@ -860,7 +860,7 @@ class TestPlasmaClient(object):
object_id = random_object_id()
def client_blocked_in_get(plasma_store_name):
- client = plasma.connect(self.plasma_store_name, "", 0)
+ client = plasma.connect(self.plasma_store_name)
# Try to get an object ID that doesn't exist. This should block.
client.get([object_id])
@@ -889,7 +889,7 @@ class TestPlasmaClient(object):
object_ids = [random_object_id() for _ in range(10)]
def client_get_multiple(plasma_store_name):
- client = plasma.connect(self.plasma_store_name, "", 0)
+ client = plasma.connect(self.plasma_store_name)
# Try to get an object ID that doesn't exist. This should block.
client.get(object_ids)
@@ -948,7 +948,7 @@ def test_use_huge_pages():
plasma_store_memory=2*10**9,
plasma_directory="/mnt/hugepages",
use_hugepages=True) as (plasma_store_name, p):
- plasma_client = plasma.connect(plasma_store_name, "")
+ plasma_client = plasma.connect(plasma_store_name)
create_object(plasma_client, 10**8)
@@ -962,7 +962,7 @@ def test_plasma_client_sharing():
with plasma.start_plasma_store(
plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
as (plasma_store_name, p):
- plasma_client = plasma.connect(plasma_store_name, "")
+ plasma_client = plasma.connect(plasma_store_name)
object_id = plasma_client.put(np.zeros(3))
buf = plasma_client.get(object_id)
del plasma_client
@@ -977,7 +977,7 @@ def test_plasma_list():
with plasma.start_plasma_store(
plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
as (plasma_store_name, p):
- plasma_client = plasma.connect(plasma_store_name, "", 0)
+ plasma_client = plasma.connect(plasma_store_name)
# Test sizes
u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False)
diff --git a/python/pyarrow/tests/test_plasma_tf_op.py b/python/pyarrow/tests/test_plasma_tf_op.py
index 51e8b28..e239055 100644
--- a/python/pyarrow/tests/test_plasma_tf_op.py
+++ b/python/pyarrow/tests/test_plasma_tf_op.py
@@ -37,15 +37,13 @@ def run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
return plasma.tf_plasma_op.tensor_to_plasma(
[data_tensor, ones_tensor],
object_id,
- plasma_store_socket_name=plasma_store_name,
- plasma_manager_socket_name="")
+ plasma_store_socket_name=plasma_store_name)
def FromPlasma():
return plasma.tf_plasma_op.plasma_to_tensor(
object_id,
dtype=tf.as_dtype(dtype),
- plasma_store_socket_name=plasma_store_name,
- plasma_manager_socket_name="")
+ plasma_store_socket_name=plasma_store_name)
with tf.device(FORCE_DEVICE):
to_plasma = ToPlasma()
@@ -94,7 +92,7 @@ def test_plasma_tf_op(use_gpu=False):
pytest.skip("TensorFlow Op not found")
with plasma.start_plasma_store(10**8) as (plasma_store_name, p):
- client = plasma.connect(plasma_store_name, "")
+ client = plasma.connect(plasma_store_name)
for dtype in [np.float32, np.float64,
np.int8, np.int16, np.int32, np.int64]:
run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,