You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/08/26 20:24:12 UTC

[arrow] branch master updated: ARROW-3116: [Plasma] Add "ls" to object store

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 628b74b  ARROW-3116: [Plasma] Add "ls" to object store
628b74b is described below

commit 628b74b0de0a3d7ea6d7f424f605922fb8637b46
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Sun Aug 26 13:24:03 2018 -0700

    ARROW-3116: [Plasma] Add "ls" to object store
    
    This adds plasma_client.list to the plasma client API.
    
    It can be used like so:
    
    ```python
      import pyarrow.plasma as plasma
      import time
    
      client = plasma.connect("/tmp/plasma", "", 0)
    
      client.put("hello, world")
      # Sleep a little so we get different creation times
      time.sleep(2)
      client.put("another object")
      # Create an object that is not sealed yet
      object_id = plasma.ObjectID.from_random()
      client.create(object_id, 100)
      print(client.list())
    
      >>> {ObjectID(4cba8f80c54c6d265b46c2cdfcee6e32348b12be): {'construct_duration': 0,
      >>>  'create_time': 1535223642,
      >>>  'data_size': 460,
      >>>  'metadata_size': 0,
      >>>  'ref_count': 0,
      >>>  'state': 'sealed'},
      >>> ObjectID(a7598230b0c26464c9d9c99ae14773ee81485428): {'construct_duration': 0,
      >>>  'create_time': 1535223644,
      >>>  'data_size': 460,
      >>>  'metadata_size': 0,
      >>>  'ref_count': 0,
      >>>  'state': 'sealed'},
      >>> ObjectID(e603ab0c92098ebf08f90bfcea33ff98f6476870): {'construct_duration': -1,
      >>>  'create_time': 1535223644,
      >>>  'data_size': 100,
      >>>  'metadata_size': 0,
      >>>  'ref_count': 1,
      >>>  'state': 'created'}}
    ```
    
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #2470 from pcmoritz/plasma-list and squashes the following commits:
    
    5ff4e355 <Philipp Moritz> fix
    32c36a75 <Philipp Moritz> minor fix
    a58db5bc <Philipp Moritz> add more documentation
    1f0c91de <Philipp Moritz> fix
    1f384ee9 <Philipp Moritz> add documentation
    0958e4d1 <Philipp Moritz> merge
    ce122957 <Philipp Moritz> add test
    a6ba6f6f <Philipp Moritz> fix
    62772a87 <Philipp Moritz> add timestamp
    bb8d52f0 <Philipp Moritz> fix
    6572943d <Philipp Moritz> fix
    d343d4fb <Philipp Moritz> linting
    96040ad4 <Philipp Moritz> cleanups
    6f29cec0 <Philipp Moritz> fix test
    79a2b549 <Philipp Moritz> make variables unique
    93a9ca53 <Philipp Moritz> fix linting
    f7f36068 <Philipp Moritz> add test cases and fixes
    189928be <Philipp Moritz> add more fields
    deb06b42 <Philipp Moritz> get list working
    ac1fae62 <Philipp Moritz> update
    a07db83b <Philipp Moritz> add list command to store
    400d326f <Philipp Moritz> create object list
---
 cpp/src/plasma/client.cc            | 11 ++++++
 cpp/src/plasma/client.h             | 15 +++++++
 cpp/src/plasma/common.h             | 52 +++++++++++++++++++++++++
 cpp/src/plasma/format/common.fbs    |  5 ++-
 cpp/src/plasma/format/plasma.fbs    | 12 ++++++
 cpp/src/plasma/plasma.h             | 43 +-------------------
 cpp/src/plasma/protocol.cc          | 49 +++++++++++++++++++++++
 cpp/src/plasma/protocol.h           | 10 +++++
 cpp/src/plasma/store.cc             |  9 +++++
 python/doc/source/plasma.rst        | 43 ++++++++++++++++++++
 python/pyarrow/_plasma.pyx          | 78 +++++++++++++++++++++++++++++++++++++
 python/pyarrow/tests/test_plasma.py | 50 ++++++++++++++++++++++++
 12 files changed, 334 insertions(+), 43 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 3c30f3e..c5f372f 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -184,6 +184,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
 
   Status Contains(const ObjectID& object_id, bool* has_object);
 
+  Status List(ObjectTable* objects);
+
   Status Abort(const ObjectID& object_id);
 
   Status Seal(const ObjectID& object_id);
@@ -705,6 +707,13 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
   return Status::OK();
 }
 
+Status PlasmaClient::Impl::List(ObjectTable* objects) {
+  RETURN_NOT_OK(SendListRequest(store_conn_));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer));
+  return ReadListReply(buffer.data(), buffer.size(), objects);
+}
+
 static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) {
   XXH64_state_t hash_state;
   XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
@@ -1057,6 +1066,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
   return impl_->Contains(object_id, has_object);
 }
 
+Status PlasmaClient::List(ObjectTable* objects) { return impl_->List(objects); }
+
 Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(object_id); }
 
 Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); }
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index fe00193..a95b992 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -151,6 +151,21 @@ class ARROW_EXPORT PlasmaClient {
   /// \return The return status.
   Status Contains(const ObjectID& object_id, bool* has_object);
 
+  /// List all the objects in the object store.
+  ///
+  /// This API is experimental and might change in the future.
+  ///
+  /// \param[out] objects ObjectTable of objects in the store. For each entry
+  ///             in the map, the following fields are available:
+  ///             - metadata_size: Size of the object metadata in bytes
+  ///             - data_size: Size of the object data in bytes
+  ///             - ref_count: Number of clients referencing the object buffer
+  ///             - create_time: Unix timestamp of the object creation
+  ///             - construct_duration: Object creation time in seconds
+  ///             - state: Is the object still being created or already sealed?
+  /// \return The return status.
+  Status List(ObjectTable* objects);
+
   /// Abort an unsealed object in the object store. If the abort succeeds, then
   /// it will be as if the object was never created at all. The unsealed object
   /// must have only a single reference (the one that would have been removed by
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index a8cb931..9c3c0c9 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -18,13 +18,17 @@
 #ifndef PLASMA_COMMON_H
 #define PLASMA_COMMON_H
 
+#include <stddef.h>
+
 #include <cstring>
+#include <memory>
 #include <string>
 // TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++
 // and get rid of the next three lines:
 #ifndef __STDC_FORMAT_MACROS
 #define __STDC_FORMAT_MACROS
 #endif
+#include <unordered_map>
 
 #include "plasma/compat.h"
 
@@ -83,6 +87,54 @@ struct ObjectRequest {
   ObjectLocation location;
 };
 
+enum class ObjectState : int {
+  /// Object was created but not sealed in the local Plasma Store.
+  PLASMA_CREATED = 1,
+  /// Object is sealed and stored in the local Plasma Store.
+  PLASMA_SEALED
+};
+
+/// This type is used by the Plasma store. It is here because it is exposed to
+/// the eviction policy.
+struct ObjectTableEntry {
+  ObjectTableEntry();
+
+  ~ObjectTableEntry();
+
+  /// Memory mapped file containing the object.
+  int fd;
+  /// Device number.
+  int device_num;
+  /// Size of the underlying map.
+  int64_t map_size;
+  /// Offset from the base of the mmap.
+  ptrdiff_t offset;
+  /// Pointer to the object data. Needed to free the object.
+  uint8_t* pointer;
+  /// Size of the object in bytes.
+  int64_t data_size;
+  /// Size of the object metadata in bytes.
+  int64_t metadata_size;
+#ifdef PLASMA_GPU
+  /// IPC GPU handle to share with clients.
+  std::shared_ptr<CudaIpcMemHandle> ipc_handle;
+#endif
+  /// Number of clients currently using this object.
+  int ref_count;
+  /// Unix epoch of when this object was created.
+  int64_t create_time;
+  /// How long creation of this object took.
+  int64_t construct_duration;
+
+  /// The state of the object, e.g., whether it is open or sealed.
+  ObjectState state;
+  /// The digest of the object. Used to see if two objects are the same.
+  unsigned char digest[kDigestSize];
+};
+
+/// Mapping from ObjectIDs to information about the object.
+typedef std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>> ObjectTable;
+
 /// Globally accessible reference to plasma store configuration.
 /// TODO(pcm): This can be avoided with some refactoring of existing code
 /// by making it possible to pass a context object through dlmalloc.
diff --git a/cpp/src/plasma/format/common.fbs b/cpp/src/plasma/format/common.fbs
index 7f66bf6..818827a 100644
--- a/cpp/src/plasma/format/common.fbs
+++ b/cpp/src/plasma/format/common.fbs
@@ -25,11 +25,14 @@ table ObjectInfo {
   data_size: long;
   // Number of bytes the metadata of this object occupies in memory.
   metadata_size: long;
+  // Number of clients using the objects.
+  ref_count: int;
   // Unix epoch of when this object was created.
   create_time: long;
   // How long creation of this object took.
   construct_duration: long;
-  // Hash of the object content.
+  // Hash of the object content. If the object is not sealed yet this is
+  // an empty string.
   digest: string;
   // Specifies if this object was deleted or added.
   is_deletion: bool;
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 082ae9c..ded714a 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+include "common.fbs";
+
 // Plasma protocol specification
 namespace plasma.flatbuf;
 
@@ -44,6 +46,9 @@ enum MessageType:long {
   // See if the store contains an object (will be deprecated).
   PlasmaContainsRequest,
   PlasmaContainsReply,
+  // List all objects in the store.
+  PlasmaListRequest,
+  PlasmaListReply,
   // Get information for a newly connecting client.
   PlasmaConnectRequest,
   PlasmaConnectReply,
@@ -257,6 +262,13 @@ table PlasmaContainsReply {
   has_object: int;
 }
 
+table PlasmaListRequest {
+}
+
+table PlasmaListReply {
+  objects: [ObjectInfo];
+}
+
 // PlasmaConnect is used by a plasma client the first time it connects with the
 // store. This is not really necessary, but is used to get some information
 // about the store such as its memory capacity.
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 57ba882..b4760fc 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -95,13 +95,6 @@ struct PlasmaObject {
   int device_num;
 };
 
-enum class ObjectState : int {
-  /// Object was created but not sealed in the local Plasma Store.
-  PLASMA_CREATED = 1,
-  /// Object is sealed and stored in the local Plasma Store.
-  PLASMA_SEALED
-};
-
 enum class ObjectStatus : int {
   /// The object was not found.
   OBJECT_NOT_FOUND = 0,
@@ -109,44 +102,10 @@ enum class ObjectStatus : int {
   OBJECT_FOUND = 1
 };
 
-/// This type is used by the Plasma store. It is here because it is exposed to
-/// the eviction policy.
-struct ObjectTableEntry {
-  ObjectTableEntry();
-
-  ~ObjectTableEntry();
-
-  /// Memory mapped file containing the object.
-  int fd;
-  /// Device number.
-  int device_num;
-  /// Size of the underlying map.
-  int64_t map_size;
-  /// Offset from the base of the mmap.
-  ptrdiff_t offset;
-  /// Pointer to the object data. Needed to free the object.
-  uint8_t* pointer;
-  /// Size of the object in bytes.
-  int64_t data_size;
-  /// Size of the object metadata in bytes.
-  int64_t metadata_size;
-#ifdef PLASMA_GPU
-  /// IPC GPU handle to share with clients.
-  std::shared_ptr<CudaIpcMemHandle> ipc_handle;
-#endif
-  /// Number of clients currently using this object.
-  int ref_count;
-
-  /// The state of the object, e.g., whether it is open or sealed.
-  ObjectState state;
-  /// The digest of the object. Used to see if two objects are the same.
-  unsigned char digest[kDigestSize];
-};
-
 /// The plasma store information that is exposed to the eviction policy.
 struct PlasmaStoreInfo {
   /// Objects that are in the Plasma store.
-  std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>> objects;
+  ObjectTable objects;
   /// The amount of memory (in bytes) that we allow to be allocated in the
   /// store.
   int64_t memory_capacity;
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index da56249..5b93b65 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -17,6 +17,8 @@
 
 #include "plasma/protocol.h"
 
+#include <utility>
+
 #include "flatbuffers/flatbuffers.h"
 #include "plasma/plasma_generated.h"
 
@@ -410,6 +412,53 @@ Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
   return Status::OK();
 }
 
+// List messages.
+
+Status SendListRequest(int sock) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = fb::CreatePlasmaListRequest(fbb);
+  return PlasmaSend(sock, MessageType::PlasmaListRequest, &fbb, message);
+}
+
+Status ReadListRequest(uint8_t* data, size_t size) { return Status::OK(); }
+
+Status SendListReply(int sock, const ObjectTable& objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  std::vector<flatbuffers::Offset<fb::ObjectInfo>> object_infos;
+  for (auto const& entry : objects) {
+    auto digest = entry.second->state == ObjectState::PLASMA_CREATED
+                      ? fbb.CreateString("")
+                      : fbb.CreateString(reinterpret_cast<char*>(entry.second->digest),
+                                         kDigestSize);
+    auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.binary()),
+                                     entry.second->data_size, entry.second->metadata_size,
+                                     entry.second->ref_count, entry.second->create_time,
+                                     entry.second->construct_duration, digest);
+    object_infos.push_back(info);
+  }
+  auto message = fb::CreatePlasmaListReply(fbb, fbb.CreateVector(object_infos));
+  return PlasmaSend(sock, MessageType::PlasmaListReply, &fbb, message);
+}
+
+Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data);
+  DCHECK(VerifyFlatbuffer(message, data, size));
+  for (auto const& object : *message->objects()) {
+    ObjectID object_id = ObjectID::from_binary(object->object_id()->str());
+    auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+    entry->data_size = object->data_size();
+    entry->metadata_size = object->metadata_size();
+    entry->ref_count = object->ref_count();
+    entry->create_time = object->create_time();
+    entry->construct_duration = object->construct_duration();
+    entry->state = object->digest()->size() == 0 ? ObjectState::PLASMA_CREATED
+                                                 : ObjectState::PLASMA_SEALED;
+    (*objects)[object_id] = std::move(entry);
+  }
+  return Status::OK();
+}
+
 // Connect messages.
 
 Status SendConnectRequest(int sock) {
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 1665f0c..057ba1c 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -141,6 +141,16 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
 Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
                          bool* has_object);
 
+/* Plasma List message functions. */
+
+Status SendListRequest(int sock);
+
+Status ReadListRequest(uint8_t* data, size_t size);
+
+Status SendListReply(int sock, const ObjectTable& objects);
+
+Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects);
+
 /* Plasma Connect message functions. */
 
 Status SendConnectRequest(int sock);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 8cef3e3..58f8d01 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -43,6 +43,7 @@
 #include <sys/un.h>
 #include <unistd.h>
 
+#include <ctime>
 #include <deque>
 #include <memory>
 #include <string>
@@ -214,6 +215,8 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
   entry->offset = offset;
   entry->state = ObjectState::PLASMA_CREATED;
   entry->device_num = device_num;
+  entry->create_time = std::time(nullptr);
+  entry->construct_duration = -1;
 #ifdef PLASMA_GPU
   if (device_num != 0) {
     DCHECK_OK(gpu_handle->ExportForIpc(&entry->ipc_handle));
@@ -445,6 +448,8 @@ void PlasmaStore::SealObject(const ObjectID& object_id, unsigned char digest[])
   entry->state = ObjectState::PLASMA_SEALED;
   // Set the object digest.
   std::memcpy(&entry->digest[0], &digest[0], kDigestSize);
+  // Set object construction duration.
+  entry->construct_duration = std::time(nullptr) - entry->create_time;
   // Inform all subscribers that a new object has been sealed.
   ObjectInfoT info;
   info.object_id = object_id.binary();
@@ -784,6 +789,10 @@ Status PlasmaStore::ProcessMessage(Client* client) {
         HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
       }
     } break;
+    case fb::MessageType::PlasmaListRequest: {
+      RETURN_NOT_OK(ReadListRequest(input, input_size));
+      HANDLE_SIGPIPE(SendListReply(client->fd, store_info_.objects), client->fd);
+    } break;
     case fb::MessageType::PlasmaSealRequest: {
       unsigned char digest[kDigestSize];
       RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0]));
diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst
index 6adc470..09837cf 100644
--- a/python/doc/source/plasma.rst
+++ b/python/doc/source/plasma.rst
@@ -209,6 +209,49 @@ milliseconds). After the timeout, the interpreter will yield control back.
   b'\x01\x02\x03'
 
 
+Listing objects in the store
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The objects in the store can be listed in the following way (note that
+this functionality is currently experimental and the concrete representation
+of the object info might change in the future):
+
+.. code-block:: python
+
+  import pyarrow.plasma as plasma
+  import time
+
+  client = plasma.connect("/tmp/plasma", "", 0)
+
+  client.put("hello, world")
+  # Sleep a little so we get different creation times
+  time.sleep(2)
+  client.put("another object")
+  # Create an object that is not sealed yet
+  object_id = plasma.ObjectID.from_random()
+  client.create(object_id, 100)
+  print(client.list())
+
+  >>> {ObjectID(4cba8f80c54c6d265b46c2cdfcee6e32348b12be): {'construct_duration': 0,
+  >>>  'create_time': 1535223642,
+  >>>  'data_size': 460,
+  >>>  'metadata_size': 0,
+  >>>  'ref_count': 0,
+  >>>  'state': 'sealed'},
+  >>> ObjectID(a7598230b0c26464c9d9c99ae14773ee81485428): {'construct_duration': 0,
+  >>>  'create_time': 1535223644,
+  >>>  'data_size': 460,
+  >>>  'metadata_size': 0,
+  >>>  'ref_count': 0,
+  >>>  'state': 'sealed'},
+  >>> ObjectID(e603ab0c92098ebf08f90bfcea33ff98f6476870): {'construct_duration': -1,
+  >>>  'create_time': 1535223644,
+  >>>  'data_size': 100,
+  >>>  'metadata_size': 0,
+  >>>  'ref_count': 1,
+  >>>  'state': 'created'}}
+
+
 Using Arrow and Pandas with Plasma
 ----------------------------------
 
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 86a6232..783fbcb 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -23,7 +23,9 @@ from libcpp cimport bool as c_bool, nullptr
 from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
 from libcpp.string cimport string as c_string
 from libcpp.vector cimport vector as c_vector
+from libcpp.unordered_map cimport unordered_map
 from libc.stdint cimport int64_t, uint8_t, uintptr_t
+from cython.operator cimport dereference as deref, preincrement as inc
 from cpython.pycapsule cimport *
 
 import collections
@@ -62,6 +64,26 @@ cdef extern from "plasma/common.h" nogil:
         int type
         int location
 
+    cdef enum CObjectState" plasma::ObjectState":
+        PLASMA_CREATED" plasma::ObjectState::PLASMA_CREATED"
+        PLASMA_SEALED" plasma::ObjectState::PLASMA_SEALED"
+
+    cdef struct CObjectTableEntry" plasma::ObjectTableEntry":
+        int fd
+        int device_num
+        int64_t map_size
+        ptrdiff_t offset
+        uint8_t* pointer
+        int64_t data_size
+        int64_t metadata_size
+        int ref_count
+        int64_t create_time
+        int64_t construct_duration
+        CObjectState state
+
+    ctypedef unordered_map[CUniqueID, unique_ptr[CObjectTableEntry]] \
+        CObjectTable" plasma::ObjectTable"
+
 
 cdef extern from "plasma/common.h":
     cdef int64_t kDigestSize" plasma::kDigestSize"
@@ -101,6 +123,8 @@ cdef extern from "plasma/client.h" nogil:
 
         CStatus Contains(const CUniqueID& object_id, c_bool* has_object)
 
+        CStatus List(CObjectTable* objects)
+
         CStatus Subscribe(int* fd)
 
         CStatus GetNotification(int fd, CUniqueID* object_id,
@@ -678,6 +702,60 @@ cdef class PlasmaClient:
         with nogil:
             check_status(self.client.get().Delete(ids))
 
+    def list(self):
+        """
+        Experimental: List the objects in the store.
+
+        Returns
+        -------
+        dict
+            Dictionary from ObjectIDs to an "info" dictionary describing the
+            object. The "info" dictionary has the following entries:
+
+            data_size
+              size of the object in bytes
+
+            metadata_size
+              size of the object metadata in bytes
+
+            ref_count
+              Number of clients referencing the object buffer
+
+            create_time
+              Unix timestamp of the creation of the object
+
+            construct_duration
+              Time the creation of the object took in seconds
+
+            state
+              "created" if the object is still being created and
+              "sealed" if it is already sealed
+        """
+        cdef CObjectTable objects
+        with nogil:
+            check_status(self.client.get().List(&objects))
+        result = dict()
+        cdef ObjectID object_id
+        cdef CObjectTableEntry entry
+        it = objects.begin()
+        while it != objects.end():
+            object_id = ObjectID(deref(it).first.binary())
+            entry = deref(deref(it).second)
+            if entry.state == CObjectState.PLASMA_CREATED:
+                state = "created"
+            else:
+                state = "sealed"
+            result[object_id] = {
+                "data_size": entry.data_size,
+                "metadata_size": entry.metadata_size,
+                "ref_count": entry.ref_count,
+                "create_time": entry.create_time,
+                "construct_duration": entry.construct_duration,
+                "state": state
+            }
+            inc(it)
+        return result
+
 
 def connect(store_socket_name, manager_socket_name, int release_delay,
             int num_retries=-1):
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 7cb1c0e..2510298 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -19,12 +19,14 @@ from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
 
+import math
 import os
 import pytest
 import random
 import signal
 import subprocess
 import sys
+import time
 
 import numpy as np
 import pyarrow as pa
@@ -794,6 +796,54 @@ def test_plasma_client_sharing():
 
 
 @pytest.mark.plasma
+def test_plasma_list():
+    import pyarrow.plasma as plasma
+
+    with plasma.start_plasma_store(
+            plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
+            as (plasma_store_name, p):
+        plasma_client = plasma.connect(plasma_store_name, "", 0)
+
+        # Test sizes
+        u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False)
+        l1 = plasma_client.list()
+        assert l1[u]["data_size"] == 11
+        assert l1[u]["metadata_size"] == 7
+
+        # Test ref_count
+        v = plasma_client.put(np.zeros(3))
+        l2 = plasma_client.list()
+        # Ref count has already been released
+        assert l2[v]["ref_count"] == 0
+        a = plasma_client.get(v)
+        l3 = plasma_client.list()
+        assert l3[v]["ref_count"] == 1
+        del a
+
+        # Test state
+        w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
+        l4 = plasma_client.list()
+        assert l4[w]["state"] == "created"
+        plasma_client.seal(w)
+        l5 = plasma_client.list()
+        assert l5[w]["state"] == "sealed"
+
+        # Test timestamps
+        t1 = time.time()
+        x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
+        t2 = time.time()
+        l6 = plasma_client.list()
+        assert math.floor(t1) <= l6[x]["create_time"] <= math.ceil(t2)
+        time.sleep(2.0)
+        t3 = time.time()
+        plasma_client.seal(x)
+        t4 = time.time()
+        l7 = plasma_client.list()
+        assert math.floor(t3 - t2) <= l7[x]["construct_duration"]
+        assert l7[x]["construct_duration"] <= math.ceil(t4 - t1)
+
+
+@pytest.mark.plasma
 def test_object_id_randomness():
     cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())"
     first_object_id = subprocess.check_output(["python", "-c", cmd])