You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/12/03 22:34:15 UTC

[arrow] branch master updated: ARROW-2759: [Plasma] Export plasma notification socket

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6045dd2  ARROW-2759: [Plasma] Export plasma notification socket
6045dd2 is described below

commit 6045dd2a66507b0858cf7679745da5e785548f88
Author: suquark <su...@gmail.com>
AuthorDate: Mon Dec 3 14:34:04 2018 -0800

    ARROW-2759: [Plasma] Export plasma notification socket
    
    https://issues.apache.org/jira/browse/ARROW-2759
    
    Author: suquark <su...@gmail.com>
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #3008 from suquark/plasma and squashes the following commits:
    
    0bc89eba5 <Philipp Moritz> Update _plasma.pyx
    a6d598159 <suquark> fix a bug. style
    f037c319e <suquark> fix imports
    19dae23d0 <suquark> use compat
    942a62f84 <suquark> fix test
    37b656013 <suquark> fix test
    19f6b9852 <suquark> py2 compatibility
    8e636ebaf <suquark> py2 compatibility
    bbf07b94d <suquark> lint
    6005b34a5 <suquark> lint
    2719d0dc1 <suquark> lint
    048f0c2ab <suquark> Export plasma notification socket.
---
 cpp/src/plasma/client.cc            | 28 ++++++++++++++++++++-------
 cpp/src/plasma/client.h             |  3 +++
 python/pyarrow/_plasma.pyx          | 38 +++++++++++++++++++++++++++++++++++++
 python/pyarrow/compat.py            | 10 ++++++++++
 python/pyarrow/tests/test_plasma.py | 29 ++++++++++++++++++++++++++++
 5 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 0c96be0..20dc421 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -202,6 +202,9 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
 
   Status Subscribe(int* fd);
 
+  Status DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
+                            int64_t* data_size, int64_t* metadata_size);
+
   Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size,
                          int64_t* metadata_size);
 
@@ -943,13 +946,10 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
   return Status::OK();
 }
 
-Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
-                                           int64_t* data_size, int64_t* metadata_size) {
-  auto notification = ReadMessageAsync(fd);
-  if (notification == NULL) {
-    return Status::IOError("Failed to read object notification from Plasma socket");
-  }
-  auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(notification.get());
+Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
+                                              int64_t* data_size,
+                                              int64_t* metadata_size) {
+  auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(buffer);
   ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
   memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
   if (object_info->is_deletion()) {
@@ -962,6 +962,15 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
   return Status::OK();
 }
 
+Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
+                                           int64_t* data_size, int64_t* metadata_size) {
+  auto notification = ReadMessageAsync(fd);
+  if (notification == NULL) {
+    return Status::IOError("Failed to read object notification from Plasma socket");
+  }
+  return DecodeNotification(notification.get(), object_id, data_size, metadata_size);
+}
+
 Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
                                    const std::string& manager_socket_name,
                                    int release_delay, int num_retries) {
@@ -1138,6 +1147,11 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_
   return impl_->GetNotification(fd, object_id, data_size, metadata_size);
 }
 
+Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
+                                        int64_t* data_size, int64_t* metadata_size) {
+  return impl_->DecodeNotification(buffer, object_id, data_size, metadata_size);
+}
+
 Status PlasmaClient::Disconnect() { return impl_->Disconnect(); }
 
 Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 1ad09f5..9e080b7 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -246,6 +246,9 @@ class ARROW_EXPORT PlasmaClient {
   Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size,
                          int64_t* metadata_size);
 
+  Status DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
+                            int64_t* data_size, int64_t* metadata_size);
+
   /// Disconnect from the local plasma instance, including the local store and
   /// manager.
   ///
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 677e768..2fad09c 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -32,11 +32,13 @@ from cpython.pycapsule cimport *
 import collections
 import pyarrow
 import random
+import socket
 
 from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer
 from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
                                         CFixedSizeBufferWriter, CStatus)
 
+from pyarrow import compat
 
 PLASMA_WAIT_TIMEOUT = 2 ** 30
 
@@ -131,6 +133,10 @@ cdef extern from "plasma/client.h" nogil:
 
         CStatus Subscribe(int* fd)
 
+        CStatus DecodeNotification(const uint8_t* buffer,
+                                   CUniqueID* object_id, int64_t* data_size,
+                                   int64_t* metadata_size)
+
         CStatus GetNotification(int fd, CUniqueID* object_id,
                                 int64_t* data_size, int64_t* metadata_size)
 
@@ -729,6 +735,38 @@ cdef class PlasmaClient:
         with nogil:
             check_status(self.client.get().Subscribe(&self.notification_fd))
 
+    def get_notification_socket(self):
+        """
+        Get the notification socket.
+        """
+        return compat.get_socket_from_fd(self.notification_fd,
+                                         family=socket.AF_UNIX,
+                                         type=socket.SOCK_STREAM)
+
+    def decode_notification(self, const uint8_t* buf):
+        """
+        Get the notification from the buffer.
+
+        Returns
+        -------
+        ObjectID
+            The object ID of the object that was stored.
+        int
+            The data size of the object that was stored.
+        int
+            The metadata size of the object that was stored.
+        """
+        cdef CUniqueID object_id
+        cdef int64_t data_size
+        cdef int64_t metadata_size
+        with nogil:
+            check_status(self.client.get()
+                         .DecodeNotification(buf,
+                                             &object_id,
+                                             &data_size,
+                                             &metadata_size))
+        return ObjectID(object_id.binary()), data_size, metadata_size
+
     def get_next_notification(self):
         """
         Get the next notification from the notification socket.
diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py
index a481db0..068d560 100644
--- a/python/pyarrow/compat.py
+++ b/python/pyarrow/compat.py
@@ -25,6 +25,7 @@ import numpy as np
 import sys
 import six
 from six import BytesIO, StringIO, string_types as py_string
+import socket
 
 
 PY26 = sys.version_info[:2] == (2, 6)
@@ -267,4 +268,13 @@ def import_pytorch_extension():
 
 integer_types = six.integer_types + (np.integer,)
 
+
+def get_socket_from_fd(fileno, family, type):
+    if PY2:
+        socket_obj = socket.fromfd(fileno, family, type)
+        return socket.socket(family, type, _sock=socket_obj)
+    else:
+        return socket.socket(fileno=fileno, family=family, type=type)
+
+
 __all__ = []
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 69b3d9c..e3d31b7 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -25,6 +25,7 @@ import os
 import pytest
 import random
 import signal
+import struct
 import subprocess
 import sys
 import time
@@ -742,6 +743,34 @@ class TestPlasmaClient(object):
                 assert data_sizes[j] == recv_dsize
                 assert metadata_sizes[j] == recv_msize
 
+    def test_subscribe_socket(self):
+        # Subscribe to notifications from the Plasma Store.
+        self.plasma_client.subscribe()
+        rsock = self.plasma_client.get_notification_socket()
+        for i in self.SUBSCRIBE_TEST_SIZES:
+            # Get notification from socket.
+            object_ids = [random_object_id() for _ in range(i)]
+            metadata_sizes = [np.random.randint(1000) for _ in range(i)]
+            data_sizes = [np.random.randint(1000) for _ in range(i)]
+
+            for j in range(i):
+                self.plasma_client.create(
+                    object_ids[j], data_sizes[j],
+                    metadata=bytearray(np.random.bytes(metadata_sizes[j])))
+                self.plasma_client.seal(object_ids[j])
+
+            # Check that we received notifications for all of the objects.
+            for j in range(i):
+                # Assume the plasma store will not be full,
+                # so we always get the data size instead of -1.
+                msg_len, = struct.unpack('L', rsock.recv(8))
+                content = rsock.recv(msg_len)
+                recv_objid, recv_dsize, recv_msize = (
+                    self.plasma_client.decode_notification(content))
+                assert object_ids[j] == recv_objid
+                assert data_sizes[j] == recv_dsize
+                assert metadata_sizes[j] == recv_msize
+
     def test_subscribe_deletions(self):
         # Subscribe to notifications from the Plasma Store. We use
         # plasma_client2 to make sure that all used objects will get evicted