You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/12/03 22:34:15 UTC
[arrow] branch master updated: ARROW-2759: [Plasma] Export plasma
notification socket
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6045dd2 ARROW-2759: [Plasma] Export plasma notification socket
6045dd2 is described below
commit 6045dd2a66507b0858cf7679745da5e785548f88
Author: suquark <su...@gmail.com>
AuthorDate: Mon Dec 3 14:34:04 2018 -0800
ARROW-2759: [Plasma] Export plasma notification socket
https://issues.apache.org/jira/browse/ARROW-2759
Author: suquark <su...@gmail.com>
Author: Philipp Moritz <pc...@gmail.com>
Closes #3008 from suquark/plasma and squashes the following commits:
0bc89eba5 <Philipp Moritz> Update _plasma.pyx
a6d598159 <suquark> fix a bug. style
f037c319e <suquark> fix imports
19dae23d0 <suquark> use compat
942a62f84 <suquark> fix test
37b656013 <suquark> fix test
19f6b9852 <suquark> py2 compatibility
8e636ebaf <suquark> py2 compatibility
bbf07b94d <suquark> lint
6005b34a5 <suquark> lint
2719d0dc1 <suquark> lint
048f0c2ab <suquark> Export plasma notification socket.
---
cpp/src/plasma/client.cc | 28 ++++++++++++++++++++-------
cpp/src/plasma/client.h | 3 +++
python/pyarrow/_plasma.pyx | 38 +++++++++++++++++++++++++++++++++++++
python/pyarrow/compat.py | 10 ++++++++++
python/pyarrow/tests/test_plasma.py | 29 ++++++++++++++++++++++++++++
5 files changed, 101 insertions(+), 7 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 0c96be0..20dc421 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -202,6 +202,9 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status Subscribe(int* fd);
+ Status DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
+ int64_t* data_size, int64_t* metadata_size);
+
Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size,
int64_t* metadata_size);
@@ -943,13 +946,10 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
return Status::OK();
}
-Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
- int64_t* data_size, int64_t* metadata_size) {
- auto notification = ReadMessageAsync(fd);
- if (notification == NULL) {
- return Status::IOError("Failed to read object notification from Plasma socket");
- }
- auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(notification.get());
+Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
+ int64_t* data_size,
+ int64_t* metadata_size) {
+ auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(buffer);
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
if (object_info->is_deletion()) {
@@ -962,6 +962,15 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
return Status::OK();
}
+Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
+ int64_t* data_size, int64_t* metadata_size) {
+ auto notification = ReadMessageAsync(fd);
+ if (notification == NULL) {
+ return Status::IOError("Failed to read object notification from Plasma socket");
+ }
+ return DecodeNotification(notification.get(), object_id, data_size, metadata_size);
+}
+
Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
const std::string& manager_socket_name,
int release_delay, int num_retries) {
@@ -1138,6 +1147,11 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_
return impl_->GetNotification(fd, object_id, data_size, metadata_size);
}
+Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
+ int64_t* data_size, int64_t* metadata_size) {
+ return impl_->DecodeNotification(buffer, object_id, data_size, metadata_size);
+}
+
Status PlasmaClient::Disconnect() { return impl_->Disconnect(); }
Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 1ad09f5..9e080b7 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -246,6 +246,9 @@ class ARROW_EXPORT PlasmaClient {
Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size,
int64_t* metadata_size);
+ Status DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
+ int64_t* data_size, int64_t* metadata_size);
+
/// Disconnect from the local plasma instance, including the local store and
/// manager.
///
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 677e768..2fad09c 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -32,11 +32,13 @@ from cpython.pycapsule cimport *
import collections
import pyarrow
import random
+import socket
from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer
from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
CFixedSizeBufferWriter, CStatus)
+from pyarrow import compat
PLASMA_WAIT_TIMEOUT = 2 ** 30
@@ -131,6 +133,10 @@ cdef extern from "plasma/client.h" nogil:
CStatus Subscribe(int* fd)
+ CStatus DecodeNotification(const uint8_t* buffer,
+ CUniqueID* object_id, int64_t* data_size,
+ int64_t* metadata_size)
+
CStatus GetNotification(int fd, CUniqueID* object_id,
int64_t* data_size, int64_t* metadata_size)
@@ -729,6 +735,38 @@ cdef class PlasmaClient:
with nogil:
check_status(self.client.get().Subscribe(&self.notification_fd))
+ def get_notification_socket(self):
+ """
+ Get the notification socket.
+ """
+ return compat.get_socket_from_fd(self.notification_fd,
+ family=socket.AF_UNIX,
+ type=socket.SOCK_STREAM)
+
+ def decode_notification(self, const uint8_t* buf):
+ """
+ Get the notification from the buffer.
+
+ Returns
+ -------
+ ObjectID
+ The object ID of the object that was stored.
+ int
+ The data size of the object that was stored.
+ int
+ The metadata size of the object that was stored.
+ """
+ cdef CUniqueID object_id
+ cdef int64_t data_size
+ cdef int64_t metadata_size
+ with nogil:
+ check_status(self.client.get()
+ .DecodeNotification(buf,
+ &object_id,
+ &data_size,
+ &metadata_size))
+ return ObjectID(object_id.binary()), data_size, metadata_size
+
def get_next_notification(self):
"""
Get the next notification from the notification socket.
diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py
index a481db0..068d560 100644
--- a/python/pyarrow/compat.py
+++ b/python/pyarrow/compat.py
@@ -25,6 +25,7 @@ import numpy as np
import sys
import six
from six import BytesIO, StringIO, string_types as py_string
+import socket
PY26 = sys.version_info[:2] == (2, 6)
@@ -267,4 +268,13 @@ def import_pytorch_extension():
integer_types = six.integer_types + (np.integer,)
+
+def get_socket_from_fd(fileno, family, type):
+ if PY2:
+ socket_obj = socket.fromfd(fileno, family, type)
+ return socket.socket(family, type, _sock=socket_obj)
+ else:
+ return socket.socket(fileno=fileno, family=family, type=type)
+
+
__all__ = []
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 69b3d9c..e3d31b7 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -25,6 +25,7 @@ import os
import pytest
import random
import signal
+import struct
import subprocess
import sys
import time
@@ -742,6 +743,34 @@ class TestPlasmaClient(object):
assert data_sizes[j] == recv_dsize
assert metadata_sizes[j] == recv_msize
+ def test_subscribe_socket(self):
+ # Subscribe to notifications from the Plasma Store.
+ self.plasma_client.subscribe()
+ rsock = self.plasma_client.get_notification_socket()
+ for i in self.SUBSCRIBE_TEST_SIZES:
+ # Get notification from socket.
+ object_ids = [random_object_id() for _ in range(i)]
+ metadata_sizes = [np.random.randint(1000) for _ in range(i)]
+ data_sizes = [np.random.randint(1000) for _ in range(i)]
+
+ for j in range(i):
+ self.plasma_client.create(
+ object_ids[j], data_sizes[j],
+ metadata=bytearray(np.random.bytes(metadata_sizes[j])))
+ self.plasma_client.seal(object_ids[j])
+
+ # Check that we received notifications for all of the objects.
+ for j in range(i):
+ # Assume the plasma store will not be full,
+ # so we always get the data size instead of -1.
+ msg_len, = struct.unpack('L', rsock.recv(8))
+ content = rsock.recv(msg_len)
+ recv_objid, recv_dsize, recv_msize = (
+ self.plasma_client.decode_notification(content))
+ assert object_ids[j] == recv_objid
+ assert data_sizes[j] == recv_dsize
+ assert metadata_sizes[j] == recv_msize
+
def test_subscribe_deletions(self):
# Subscribe to notifications from the Plasma Store. We use
# plasma_client2 to make sure that all used objects will get evicted