You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/08/26 20:24:12 UTC
[arrow] branch master updated: ARROW-3116: [Plasma] Add "ls" to
object store
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 628b74b ARROW-3116: [Plasma] Add "ls" to object store
628b74b is described below
commit 628b74b0de0a3d7ea6d7f424f605922fb8637b46
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Sun Aug 26 13:24:03 2018 -0700
ARROW-3116: [Plasma] Add "ls" to object store
This adds plasma_client.list to the plasma client API.
It can be used like so:
```python
import pyarrow.plasma as plasma
import time
client = plasma.connect("/tmp/plasma", "", 0)
client.put("hello, world")
# Sleep a little so we get different creation times
time.sleep(2)
client.put("another object")
# Create an object that is not sealed yet
object_id = plasma.ObjectID.from_random()
client.create(object_id, 100)
print(client.list())
>>> {ObjectID(4cba8f80c54c6d265b46c2cdfcee6e32348b12be): {'construct_duration': 0,
>>> 'create_time': 1535223642,
>>> 'data_size': 460,
>>> 'metadata_size': 0,
>>> 'ref_count': 0,
>>> 'state': 'sealed'},
>>> ObjectID(a7598230b0c26464c9d9c99ae14773ee81485428): {'construct_duration': 0,
>>> 'create_time': 1535223644,
>>> 'data_size': 460,
>>> 'metadata_size': 0,
>>> 'ref_count': 0,
>>> 'state': 'sealed'},
>>> ObjectID(e603ab0c92098ebf08f90bfcea33ff98f6476870): {'construct_duration': -1,
>>> 'create_time': 1535223644,
>>> 'data_size': 100,
>>> 'metadata_size': 0,
>>> 'ref_count': 1,
>>> 'state': 'created'}}
```
Author: Philipp Moritz <pc...@gmail.com>
Closes #2470 from pcmoritz/plasma-list and squashes the following commits:
5ff4e355 <Philipp Moritz> fix
32c36a75 <Philipp Moritz> minor fix
a58db5bc <Philipp Moritz> add more documentation
1f0c91de <Philipp Moritz> fix
1f384ee9 <Philipp Moritz> add documentation
0958e4d1 <Philipp Moritz> merge
ce122957 <Philipp Moritz> add test
a6ba6f6f <Philipp Moritz> fix
62772a87 <Philipp Moritz> add timestamp
bb8d52f0 <Philipp Moritz> fix
6572943d <Philipp Moritz> fix
d343d4fb <Philipp Moritz> linting
96040ad4 <Philipp Moritz> cleanups
6f29cec0 <Philipp Moritz> fix test
79a2b549 <Philipp Moritz> make variables unique
93a9ca53 <Philipp Moritz> fix linting
f7f36068 <Philipp Moritz> add test cases and fixes
189928be <Philipp Moritz> add more fields
deb06b42 <Philipp Moritz> get list working
ac1fae62 <Philipp Moritz> update
a07db83b <Philipp Moritz> add list command to store
400d326f <Philipp Moritz> create object list
---
cpp/src/plasma/client.cc | 11 ++++++
cpp/src/plasma/client.h | 15 +++++++
cpp/src/plasma/common.h | 52 +++++++++++++++++++++++++
cpp/src/plasma/format/common.fbs | 5 ++-
cpp/src/plasma/format/plasma.fbs | 12 ++++++
cpp/src/plasma/plasma.h | 43 +-------------------
cpp/src/plasma/protocol.cc | 49 +++++++++++++++++++++++
cpp/src/plasma/protocol.h | 10 +++++
cpp/src/plasma/store.cc | 9 +++++
python/doc/source/plasma.rst | 43 ++++++++++++++++++++
python/pyarrow/_plasma.pyx | 78 +++++++++++++++++++++++++++++++++++++
python/pyarrow/tests/test_plasma.py | 50 ++++++++++++++++++++++++
12 files changed, 334 insertions(+), 43 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 3c30f3e..c5f372f 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -184,6 +184,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status Contains(const ObjectID& object_id, bool* has_object);
+ Status List(ObjectTable* objects);
+
Status Abort(const ObjectID& object_id);
Status Seal(const ObjectID& object_id);
@@ -705,6 +707,13 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
return Status::OK();
}
+Status PlasmaClient::Impl::List(ObjectTable* objects) {
+ RETURN_NOT_OK(SendListRequest(store_conn_));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer));
+ return ReadListReply(buffer.data(), buffer.size(), objects);
+}
+
static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) {
XXH64_state_t hash_state;
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
@@ -1057,6 +1066,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
return impl_->Contains(object_id, has_object);
}
+Status PlasmaClient::List(ObjectTable* objects) { return impl_->List(objects); }
+
Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(object_id); }
Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); }
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index fe00193..a95b992 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -151,6 +151,21 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status.
Status Contains(const ObjectID& object_id, bool* has_object);
+ /// List all the objects in the object store.
+ ///
+ /// This API is experimental and might change in the future.
+ ///
+ /// \param[out] objects ObjectTable of objects in the store. For each entry
+ /// in the map, the following fields are available:
+ /// - metadata_size: Size of the object metadata in bytes
+ /// - data_size: Size of the object data in bytes
+ /// - ref_count: Number of clients referencing the object buffer
+ /// - create_time: Unix timestamp of the object creation
+ /// - construct_duration: Object creation time in seconds
+ /// - state: Is the object still being created or already sealed?
+ /// \return The return status.
+ Status List(ObjectTable* objects);
+
/// Abort an unsealed object in the object store. If the abort succeeds, then
/// it will be as if the object was never created at all. The unsealed object
/// must have only a single reference (the one that would have been removed by
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index a8cb931..9c3c0c9 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -18,13 +18,17 @@
#ifndef PLASMA_COMMON_H
#define PLASMA_COMMON_H
+#include <stddef.h>
+
#include <cstring>
+#include <memory>
#include <string>
// TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++
// and get rid of the next three lines:
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
+#include <unordered_map>
#include "plasma/compat.h"
@@ -83,6 +87,54 @@ struct ObjectRequest {
ObjectLocation location;
};
+enum class ObjectState : int {
+ /// Object was created but not sealed in the local Plasma Store.
+ PLASMA_CREATED = 1,
+ /// Object is sealed and stored in the local Plasma Store.
+ PLASMA_SEALED
+};
+
+/// This type is used by the Plasma store. It is here because it is exposed to
+/// the eviction policy.
+struct ObjectTableEntry {
+ ObjectTableEntry();
+
+ ~ObjectTableEntry();
+
+ /// Memory mapped file containing the object.
+ int fd;
+ /// Device number.
+ int device_num;
+ /// Size of the underlying map.
+ int64_t map_size;
+ /// Offset from the base of the mmap.
+ ptrdiff_t offset;
+ /// Pointer to the object data. Needed to free the object.
+ uint8_t* pointer;
+ /// Size of the object in bytes.
+ int64_t data_size;
+ /// Size of the object metadata in bytes.
+ int64_t metadata_size;
+#ifdef PLASMA_GPU
+ /// IPC GPU handle to share with clients.
+ std::shared_ptr<CudaIpcMemHandle> ipc_handle;
+#endif
+ /// Number of clients currently using this object.
+ int ref_count;
+ /// Unix epoch of when this object was created.
+ int64_t create_time;
+ /// How long creation of this object took.
+ int64_t construct_duration;
+
+ /// The state of the object, e.g., whether it is open or sealed.
+ ObjectState state;
+ /// The digest of the object. Used to see if two objects are the same.
+ unsigned char digest[kDigestSize];
+};
+
+/// Mapping from ObjectIDs to information about the object.
+typedef std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>> ObjectTable;
+
/// Globally accessible reference to plasma store configuration.
/// TODO(pcm): This can be avoided with some refactoring of existing code
/// by making it possible to pass a context object through dlmalloc.
diff --git a/cpp/src/plasma/format/common.fbs b/cpp/src/plasma/format/common.fbs
index 7f66bf6..818827a 100644
--- a/cpp/src/plasma/format/common.fbs
+++ b/cpp/src/plasma/format/common.fbs
@@ -25,11 +25,14 @@ table ObjectInfo {
data_size: long;
// Number of bytes the metadata of this object occupies in memory.
metadata_size: long;
+ // Number of clients using the objects.
+ ref_count: int;
// Unix epoch of when this object was created.
create_time: long;
// How long creation of this object took.
construct_duration: long;
- // Hash of the object content.
+ // Hash of the object content. If the object is not sealed yet this is
+ // an empty string.
digest: string;
// Specifies if this object was deleted or added.
is_deletion: bool;
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 082ae9c..ded714a 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+include "common.fbs";
+
// Plasma protocol specification
namespace plasma.flatbuf;
@@ -44,6 +46,9 @@ enum MessageType:long {
// See if the store contains an object (will be deprecated).
PlasmaContainsRequest,
PlasmaContainsReply,
+ // List all objects in the store.
+ PlasmaListRequest,
+ PlasmaListReply,
// Get information for a newly connecting client.
PlasmaConnectRequest,
PlasmaConnectReply,
@@ -257,6 +262,13 @@ table PlasmaContainsReply {
has_object: int;
}
+table PlasmaListRequest {
+}
+
+table PlasmaListReply {
+ objects: [ObjectInfo];
+}
+
// PlasmaConnect is used by a plasma client the first time it connects with the
// store. This is not really necessary, but is used to get some information
// about the store such as its memory capacity.
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 57ba882..b4760fc 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -95,13 +95,6 @@ struct PlasmaObject {
int device_num;
};
-enum class ObjectState : int {
- /// Object was created but not sealed in the local Plasma Store.
- PLASMA_CREATED = 1,
- /// Object is sealed and stored in the local Plasma Store.
- PLASMA_SEALED
-};
-
enum class ObjectStatus : int {
/// The object was not found.
OBJECT_NOT_FOUND = 0,
@@ -109,44 +102,10 @@ enum class ObjectStatus : int {
OBJECT_FOUND = 1
};
-/// This type is used by the Plasma store. It is here because it is exposed to
-/// the eviction policy.
-struct ObjectTableEntry {
- ObjectTableEntry();
-
- ~ObjectTableEntry();
-
- /// Memory mapped file containing the object.
- int fd;
- /// Device number.
- int device_num;
- /// Size of the underlying map.
- int64_t map_size;
- /// Offset from the base of the mmap.
- ptrdiff_t offset;
- /// Pointer to the object data. Needed to free the object.
- uint8_t* pointer;
- /// Size of the object in bytes.
- int64_t data_size;
- /// Size of the object metadata in bytes.
- int64_t metadata_size;
-#ifdef PLASMA_GPU
- /// IPC GPU handle to share with clients.
- std::shared_ptr<CudaIpcMemHandle> ipc_handle;
-#endif
- /// Number of clients currently using this object.
- int ref_count;
-
- /// The state of the object, e.g., whether it is open or sealed.
- ObjectState state;
- /// The digest of the object. Used to see if two objects are the same.
- unsigned char digest[kDigestSize];
-};
-
/// The plasma store information that is exposed to the eviction policy.
struct PlasmaStoreInfo {
/// Objects that are in the Plasma store.
- std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>> objects;
+ ObjectTable objects;
/// The amount of memory (in bytes) that we allow to be allocated in the
/// store.
int64_t memory_capacity;
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index da56249..5b93b65 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -17,6 +17,8 @@
#include "plasma/protocol.h"
+#include <utility>
+
#include "flatbuffers/flatbuffers.h"
#include "plasma/plasma_generated.h"
@@ -410,6 +412,53 @@ Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
return Status::OK();
}
+// List messages.
+
+Status SendListRequest(int sock) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaListRequest(fbb);
+ return PlasmaSend(sock, MessageType::PlasmaListRequest, &fbb, message);
+}
+
+Status ReadListRequest(uint8_t* data, size_t size) { return Status::OK(); }
+
+Status SendListReply(int sock, const ObjectTable& objects) {
+ flatbuffers::FlatBufferBuilder fbb;
+ std::vector<flatbuffers::Offset<fb::ObjectInfo>> object_infos;
+ for (auto const& entry : objects) {
+ auto digest = entry.second->state == ObjectState::PLASMA_CREATED
+ ? fbb.CreateString("")
+ : fbb.CreateString(reinterpret_cast<char*>(entry.second->digest),
+ kDigestSize);
+ auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.binary()),
+ entry.second->data_size, entry.second->metadata_size,
+ entry.second->ref_count, entry.second->create_time,
+ entry.second->construct_duration, digest);
+ object_infos.push_back(info);
+ }
+ auto message = fb::CreatePlasmaListReply(fbb, fbb.CreateVector(object_infos));
+ return PlasmaSend(sock, MessageType::PlasmaListReply, &fbb, message);
+}
+
+Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ for (auto const& object : *message->objects()) {
+ ObjectID object_id = ObjectID::from_binary(object->object_id()->str());
+ auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+ entry->data_size = object->data_size();
+ entry->metadata_size = object->metadata_size();
+ entry->ref_count = object->ref_count();
+ entry->create_time = object->create_time();
+ entry->construct_duration = object->construct_duration();
+ entry->state = object->digest()->size() == 0 ? ObjectState::PLASMA_CREATED
+ : ObjectState::PLASMA_SEALED;
+ (*objects)[object_id] = std::move(entry);
+ }
+ return Status::OK();
+}
+
// Connect messages.
Status SendConnectRequest(int sock) {
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 1665f0c..057ba1c 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -141,6 +141,16 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
bool* has_object);
+/* Plasma List message functions. */
+
+Status SendListRequest(int sock);
+
+Status ReadListRequest(uint8_t* data, size_t size);
+
+Status SendListReply(int sock, const ObjectTable& objects);
+
+Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects);
+
/* Plasma Connect message functions. */
Status SendConnectRequest(int sock);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 8cef3e3..58f8d01 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -43,6 +43,7 @@
#include <sys/un.h>
#include <unistd.h>
+#include <ctime>
#include <deque>
#include <memory>
#include <string>
@@ -214,6 +215,8 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
entry->offset = offset;
entry->state = ObjectState::PLASMA_CREATED;
entry->device_num = device_num;
+ entry->create_time = std::time(nullptr);
+ entry->construct_duration = -1;
#ifdef PLASMA_GPU
if (device_num != 0) {
DCHECK_OK(gpu_handle->ExportForIpc(&entry->ipc_handle));
@@ -445,6 +448,8 @@ void PlasmaStore::SealObject(const ObjectID& object_id, unsigned char digest[])
entry->state = ObjectState::PLASMA_SEALED;
// Set the object digest.
std::memcpy(&entry->digest[0], &digest[0], kDigestSize);
+ // Set object construction duration.
+ entry->construct_duration = std::time(nullptr) - entry->create_time;
// Inform all subscribers that a new object has been sealed.
ObjectInfoT info;
info.object_id = object_id.binary();
@@ -784,6 +789,10 @@ Status PlasmaStore::ProcessMessage(Client* client) {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
}
} break;
+ case fb::MessageType::PlasmaListRequest: {
+ RETURN_NOT_OK(ReadListRequest(input, input_size));
+ HANDLE_SIGPIPE(SendListReply(client->fd, store_info_.objects), client->fd);
+ } break;
case fb::MessageType::PlasmaSealRequest: {
unsigned char digest[kDigestSize];
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0]));
diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst
index 6adc470..09837cf 100644
--- a/python/doc/source/plasma.rst
+++ b/python/doc/source/plasma.rst
@@ -209,6 +209,49 @@ milliseconds). After the timeout, the interpreter will yield control back.
b'\x01\x02\x03'
+Listing objects in the store
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The objects in the store can be listed in the following way (note that
+this functionality is currently experimental and the concrete representation
+of the object info might change in the future):
+
+.. code-block:: python
+
+ import pyarrow.plasma as plasma
+ import time
+
+ client = plasma.connect("/tmp/plasma", "", 0)
+
+ client.put("hello, world")
+ # Sleep a little so we get different creation times
+ time.sleep(2)
+ client.put("another object")
+ # Create an object that is not sealed yet
+ object_id = plasma.ObjectID.from_random()
+ client.create(object_id, 100)
+ print(client.list())
+
+ >>> {ObjectID(4cba8f80c54c6d265b46c2cdfcee6e32348b12be): {'construct_duration': 0,
+ >>> 'create_time': 1535223642,
+ >>> 'data_size': 460,
+ >>> 'metadata_size': 0,
+ >>> 'ref_count': 0,
+ >>> 'state': 'sealed'},
+ >>> ObjectID(a7598230b0c26464c9d9c99ae14773ee81485428): {'construct_duration': 0,
+ >>> 'create_time': 1535223644,
+ >>> 'data_size': 460,
+ >>> 'metadata_size': 0,
+ >>> 'ref_count': 0,
+ >>> 'state': 'sealed'},
+ >>> ObjectID(e603ab0c92098ebf08f90bfcea33ff98f6476870): {'construct_duration': -1,
+ >>> 'create_time': 1535223644,
+ >>> 'data_size': 100,
+ >>> 'metadata_size': 0,
+ >>> 'ref_count': 1,
+ >>> 'state': 'created'}}
+
+
Using Arrow and Pandas with Plasma
----------------------------------
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 86a6232..783fbcb 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -23,7 +23,9 @@ from libcpp cimport bool as c_bool, nullptr
from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
+from libcpp.unordered_map cimport unordered_map
from libc.stdint cimport int64_t, uint8_t, uintptr_t
+from cython.operator cimport dereference as deref, preincrement as inc
from cpython.pycapsule cimport *
import collections
@@ -62,6 +64,26 @@ cdef extern from "plasma/common.h" nogil:
int type
int location
+ cdef enum CObjectState" plasma::ObjectState":
+ PLASMA_CREATED" plasma::ObjectState::PLASMA_CREATED"
+ PLASMA_SEALED" plasma::ObjectState::PLASMA_SEALED"
+
+ cdef struct CObjectTableEntry" plasma::ObjectTableEntry":
+ int fd
+ int device_num
+ int64_t map_size
+ ptrdiff_t offset
+ uint8_t* pointer
+ int64_t data_size
+ int64_t metadata_size
+ int ref_count
+ int64_t create_time
+ int64_t construct_duration
+ CObjectState state
+
+ ctypedef unordered_map[CUniqueID, unique_ptr[CObjectTableEntry]] \
+ CObjectTable" plasma::ObjectTable"
+
cdef extern from "plasma/common.h":
cdef int64_t kDigestSize" plasma::kDigestSize"
@@ -101,6 +123,8 @@ cdef extern from "plasma/client.h" nogil:
CStatus Contains(const CUniqueID& object_id, c_bool* has_object)
+ CStatus List(CObjectTable* objects)
+
CStatus Subscribe(int* fd)
CStatus GetNotification(int fd, CUniqueID* object_id,
@@ -678,6 +702,60 @@ cdef class PlasmaClient:
with nogil:
check_status(self.client.get().Delete(ids))
+ def list(self):
+ """
+ Experimental: List the objects in the store.
+
+ Returns
+ -------
+ dict
+ Dictionary from ObjectIDs to an "info" dictionary describing the
+ object. The "info" dictionary has the following entries:
+
+ data_size
+ size of the object in bytes
+
+ metadata_size
+ size of the object metadata in bytes
+
+ ref_count
+ Number of clients referencing the object buffer
+
+ create_time
+ Unix timestamp of the creation of the object
+
+ construct_duration
+ Time the creation of the object took in seconds
+
+ state
+ "created" if the object is still being created and
+ "sealed" if it is already sealed
+ """
+ cdef CObjectTable objects
+ with nogil:
+ check_status(self.client.get().List(&objects))
+ result = dict()
+ cdef ObjectID object_id
+ cdef CObjectTableEntry entry
+ it = objects.begin()
+ while it != objects.end():
+ object_id = ObjectID(deref(it).first.binary())
+ entry = deref(deref(it).second)
+ if entry.state == CObjectState.PLASMA_CREATED:
+ state = "created"
+ else:
+ state = "sealed"
+ result[object_id] = {
+ "data_size": entry.data_size,
+ "metadata_size": entry.metadata_size,
+ "ref_count": entry.ref_count,
+ "create_time": entry.create_time,
+ "construct_duration": entry.construct_duration,
+ "state": state
+ }
+ inc(it)
+ return result
+
def connect(store_socket_name, manager_socket_name, int release_delay,
int num_retries=-1):
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 7cb1c0e..2510298 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -19,12 +19,14 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
+import math
import os
import pytest
import random
import signal
import subprocess
import sys
+import time
import numpy as np
import pyarrow as pa
@@ -794,6 +796,54 @@ def test_plasma_client_sharing():
@pytest.mark.plasma
+def test_plasma_list():
+ import pyarrow.plasma as plasma
+
+ with plasma.start_plasma_store(
+ plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
+ as (plasma_store_name, p):
+ plasma_client = plasma.connect(plasma_store_name, "", 0)
+
+ # Test sizes
+ u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False)
+ l1 = plasma_client.list()
+ assert l1[u]["data_size"] == 11
+ assert l1[u]["metadata_size"] == 7
+
+ # Test ref_count
+ v = plasma_client.put(np.zeros(3))
+ l2 = plasma_client.list()
+ # Ref count has already been released
+ assert l2[v]["ref_count"] == 0
+ a = plasma_client.get(v)
+ l3 = plasma_client.list()
+ assert l3[v]["ref_count"] == 1
+ del a
+
+ # Test state
+ w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
+ l4 = plasma_client.list()
+ assert l4[w]["state"] == "created"
+ plasma_client.seal(w)
+ l5 = plasma_client.list()
+ assert l5[w]["state"] == "sealed"
+
+ # Test timestamps
+ t1 = time.time()
+ x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
+ t2 = time.time()
+ l6 = plasma_client.list()
+ assert math.floor(t1) <= l6[x]["create_time"] <= math.ceil(t2)
+ time.sleep(2.0)
+ t3 = time.time()
+ plasma_client.seal(x)
+ t4 = time.time()
+ l7 = plasma_client.list()
+ assert math.floor(t3 - t2) <= l7[x]["construct_duration"]
+ assert l7[x]["construct_duration"] <= math.ceil(t4 - t1)
+
+
+@pytest.mark.plasma
def test_object_id_randomness():
cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())"
first_object_id = subprocess.check_output(["python", "-c", cmd])