You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/30 03:14:13 UTC
arrow git commit: ARROW-1264: [Python] Raise exception in Python
instead of aborting if cannot connect to Plasma store
Repository: arrow
Updated Branches:
refs/heads/master 4108bda82 -> 2288bfc18
ARROW-1264: [Python] Raise exception in Python instead of aborting if cannot connect to Plasma store
cc @pcmoritz @robertnishihara
Author: Wes McKinney <we...@twosigma.com>
Closes #912 from wesm/ARROW-1264 and squashes the following commits:
bd134d7e [Wes McKinney] Add flags to disable certain classes of unit tests
1d9de777 [Wes McKinney] Raise exception in Python instead of aborting if cannot connect to Plasma store
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2288bfc1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2288bfc1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2288bfc1
Branch: refs/heads/master
Commit: 2288bfc18fdbd6f50eb6c184d2349bcdd538f469
Parents: 4108bda
Author: Wes McKinney <we...@twosigma.com>
Authored: Sat Jul 29 23:14:09 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Jul 29 23:14:09 2017 -0400
----------------------------------------------------------------------
cpp/src/plasma/client.cc | 8 +++--
cpp/src/plasma/client.h | 4 ++-
cpp/src/plasma/io.cc | 30 +++++++++++-----
cpp/src/plasma/io.h | 17 ++++++---
python/manylinux1/build_arrow.sh | 5 ++-
python/pyarrow/plasma.pyx | 60 ++++++++++++++++++++++----------
python/pyarrow/tests/conftest.py | 9 ++++-
python/pyarrow/tests/test_plasma.py | 6 ++++
8 files changed, 100 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index e14b3d9..8ea62c6 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -512,10 +512,12 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_
}
Status PlasmaClient::Connect(const std::string& store_socket_name,
- const std::string& manager_socket_name, int release_delay) {
- store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
+ const std::string& manager_socket_name, int release_delay,
+ int num_retries) {
+ RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
if (manager_socket_name != "") {
- manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1);
+ RETURN_NOT_OK(
+ ConnectIpcSocketRetry(manager_socket_name, num_retries, -1, &manager_conn_));
} else {
manager_conn_ = -1;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/client.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index cc05a06..50ec55f 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -89,9 +89,11 @@ class ARROW_EXPORT PlasmaClient {
/// function will not connect to a manager.
/// @param release_delay Number of released objects that are kept around
/// and not evicted to avoid too many munmaps.
+ /// @param num_retries number of attempts to connect to IPC socket, default 50
/// @return The return status.
Status Connect(const std::string& store_socket_name,
- const std::string& manager_socket_name, int release_delay);
+ const std::string& manager_socket_name, int release_delay,
+ int num_retries = -1);
/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
index e3b6b61..9bb4339 100644
--- a/cpp/src/plasma/io.cc
+++ b/cpp/src/plasma/io.cc
@@ -17,6 +17,11 @@
#include "plasma/io.h"
+#include <cstdint>
+#include <sstream>
+
+#include "arrow/status.h"
+
#include "plasma/common.h"
using arrow::Status;
@@ -29,6 +34,8 @@ using arrow::Status;
#define NUM_CONNECT_ATTEMPTS 50
#define CONNECT_TIMEOUT_MS 100
+namespace plasma {
+
Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
ssize_t nbytes = 0;
size_t bytesleft = length;
@@ -140,8 +147,8 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
return socket_fd;
}
-int connect_ipc_sock_retry(const std::string& pathname, int num_retries,
- int64_t timeout) {
+Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
+ int64_t timeout, int* fd) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = NUM_CONNECT_ATTEMPTS;
@@ -150,23 +157,26 @@ int connect_ipc_sock_retry(const std::string& pathname, int num_retries,
timeout = CONNECT_TIMEOUT_MS;
}
- int fd = -1;
+ *fd = -1;
for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
- fd = connect_ipc_sock(pathname);
- if (fd >= 0) {
+ *fd = connect_ipc_sock(pathname);
+ if (*fd >= 0) {
break;
}
if (num_attempts == 0) {
- ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname;
+ ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
+ << ", retrying " << num_retries << " times";
}
/* Sleep for timeout milliseconds. */
usleep(static_cast<int>(timeout * 1000));
}
/* If we could not connect to the socket, exit. */
- if (fd == -1) {
- ARROW_LOG(FATAL) << "Could not connect to socket " << pathname;
+ if (*fd == -1) {
+ std::stringstream ss;
+ ss << "Could not connect to socket " << pathname;
+ return Status::IOError(ss.str());
}
- return fd;
+ return Status::OK();
}
int connect_ipc_sock(const std::string& pathname) {
@@ -224,3 +234,5 @@ uint8_t* read_message_async(int sock) {
}
return message;
}
+
+} // namespace plasma
http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
index 43c3fb5..ef96c06 100644
--- a/cpp/src/plasma/io.h
+++ b/cpp/src/plasma/io.h
@@ -34,22 +34,29 @@
#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
#define DISCONNECT_CLIENT 0
-arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length);
+namespace plasma {
-arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);
+using arrow::Status;
-arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length);
+Status WriteBytes(int fd, uint8_t* cursor, size_t length);
-arrow::Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
+Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);
+
+Status ReadBytes(int fd, uint8_t* cursor, size_t length);
+
+Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
int bind_ipc_sock(const std::string& pathname, bool shall_listen);
int connect_ipc_sock(const std::string& pathname);
-int connect_ipc_sock_retry(const std::string& pathname, int num_retries, int64_t timeout);
+Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
+ int64_t timeout, int* fd);
int AcceptClient(int socket_fd);
uint8_t* read_message_async(int sock);
+} // namespace plasma
+
#endif // PLASMA_IO_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/manylinux1/build_arrow.sh
----------------------------------------------------------------------
diff --git a/python/manylinux1/build_arrow.sh b/python/manylinux1/build_arrow.sh
index 5725b2a..5a21e36 100755
--- a/python/manylinux1/build_arrow.sh
+++ b/python/manylinux1/build_arrow.sh
@@ -80,7 +80,10 @@ for PYTHON in ${PYTHON_VERSIONS}; do
echo "=== (${PYTHON}) Testing manylinux1 wheel ==="
source /venv-test-${PYTHON}/bin/activate
pip install repaired_wheels/*.whl
- py.test --parquet /venv-test-${PYTHON}/lib/*/site-packages/pyarrow
+
+ # ARROW-1264; for some reason the test case added causes a segfault inside
+ # the Docker container when writing and error message to stderr
+ py.test --parquet /venv-test-${PYTHON}/lib/*/site-packages/pyarrow -v -s --disable-plasma
deactivate
mv repaired_wheels/*.whl /io/dist
http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/plasma.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 8aaca99..dd62d47 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -70,7 +70,8 @@ cdef extern from "plasma/client.h" nogil:
CPlasmaClient()
CStatus Connect(const c_string& store_socket_name,
- const c_string& manager_socket_name, int release_delay)
+ const c_string& manager_socket_name,
+ int release_delay, int num_retries)
CStatus Create(const CUniqueID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
@@ -98,10 +99,13 @@ cdef extern from "plasma/client.h" nogil:
CStatus Fetch(int num_object_ids, const CUniqueID* object_ids)
- CStatus Wait(int64_t num_object_requests, CObjectRequest* object_requests,
- int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
+ CStatus Wait(int64_t num_object_requests,
+ CObjectRequest* object_requests,
+ int num_ready_objects, int64_t timeout_ms,
+ int* num_objects_ready);
- CStatus Transfer(const char* addr, int port, const CUniqueID& object_id)
+ CStatus Transfer(const char* addr, int port,
+ const CUniqueID& object_id)
cdef extern from "plasma/client.h" nogil:
@@ -247,7 +251,8 @@ cdef class PlasmaClient:
def manager_socket_name(self):
return self.manager_socket_name.decode()
- def create(self, ObjectID object_id, int64_t data_size, c_string metadata=b""):
+ def create(self, ObjectID object_id, int64_t data_size,
+ c_string metadata=b""):
"""
Create a new buffer in the PlasmaStore for a particular object ID.
@@ -439,7 +444,8 @@ cdef class PlasmaClient:
"""
cdef c_string addr = address.encode()
with nogil:
- check_status(self.client.get().Transfer(addr.c_str(), port, object_id.data))
+ check_status(self.client.get()
+ .Transfer(addr.c_str(), port, object_id.data))
def fetch(self, object_ids):
"""
@@ -457,7 +463,8 @@ cdef class PlasmaClient:
with nogil:
check_status(self.client.get().Fetch(ids.size(), ids.data()))
- def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT, int num_returns=1):
+ def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT,
+ int num_returns=1):
"""
Wait until num_returns objects in object_ids are ready.
Currently, the object ID arguments to wait must be unique.
@@ -483,14 +490,18 @@ cdef class PlasmaClient:
if len(object_ids) != len(set(object_ids)):
raise Exception("Wait requires a list of unique object IDs.")
cdef int64_t num_object_requests = len(object_ids)
- cdef c_vector[CObjectRequest] object_requests = c_vector[CObjectRequest](num_object_requests)
+ cdef c_vector[CObjectRequest] object_requests = (
+ c_vector[CObjectRequest](num_object_requests))
cdef int num_objects_ready = 0
cdef ObjectID object_id
for i, object_id in enumerate(object_ids):
object_requests[i].object_id = object_id.data
object_requests[i].type = PLASMA_QUERY_ANYWHERE
with nogil:
- check_status(self.client.get().Wait(num_object_requests, object_requests.data(), num_returns, timeout, &num_objects_ready))
+ check_status(self.client.get().Wait(num_object_requests,
+ object_requests.data(),
+ num_returns, timeout,
+ &num_objects_ready))
cdef int num_to_return = min(num_objects_ready, num_returns);
ready_ids = []
waiting_ids = set(object_ids)
@@ -498,9 +509,12 @@ cdef class PlasmaClient:
for i in range(len(object_ids)):
if num_returned == num_to_return:
break
- if object_requests[i].status == ObjectStatusLocal or object_requests[i].status == ObjectStatusRemote:
- ready_ids.append(ObjectID(object_requests[i].object_id.binary()))
- waiting_ids.discard(ObjectID(object_requests[i].object_id.binary()))
+ if (object_requests[i].status == ObjectStatusLocal or
+ object_requests[i].status == ObjectStatusRemote):
+ ready_ids.append(
+ ObjectID(object_requests[i].object_id.binary()))
+ waiting_ids.discard(
+ ObjectID(object_requests[i].object_id.binary()))
num_returned += 1
return ready_ids, list(waiting_ids)
@@ -526,10 +540,11 @@ cdef class PlasmaClient:
cdef int64_t data_size
cdef int64_t metadata_size
with nogil:
- check_status(self.client.get().GetNotification(self.notification_fd,
- &object_id.data,
- &data_size,
- &metadata_size))
+ check_status(self.client.get()
+ .GetNotification(self.notification_fd,
+ &object_id.data,
+ &data_size,
+ &metadata_size))
return object_id, data_size, metadata_size
def to_capsule(self):
@@ -542,7 +557,9 @@ cdef class PlasmaClient:
with nogil:
check_status(self.client.get().Disconnect())
-def connect(store_socket_name, manager_socket_name, int release_delay):
+
+def connect(store_socket_name, manager_socket_name, int release_delay,
+ int num_retries=-1):
"""
Return a new PlasmaClient that is connected a plasma store and
optionally a manager.
@@ -556,11 +573,16 @@ def connect(store_socket_name, manager_socket_name, int release_delay):
release_delay : int
The maximum number of objects that the client will keep and
delay releasing (for caching reasons).
+ num_retries : int, default -1
+ Number of times tor ty to connect to plasma store. Default value of -1
+ uses the default (50)
"""
cdef PlasmaClient result = PlasmaClient()
result.store_socket_name = store_socket_name.encode()
result.manager_socket_name = manager_socket_name.encode()
with nogil:
- check_status(result.client.get().Connect(result.store_socket_name,
- result.manager_socket_name, release_delay))
+ check_status(result.client.get()
+ .Connect(result.store_socket_name,
+ result.manager_socket_name,
+ release_delay, num_retries))
return result
http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/tests/conftest.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py
index f2d67f6..651438b 100644
--- a/python/pyarrow/tests/conftest.py
+++ b/python/pyarrow/tests/conftest.py
@@ -52,6 +52,11 @@ def pytest_addoption(parser):
help=('Enable the {0} test group'.format(group)))
for group in groups:
+ parser.addoption('--disable-{0}'.format(group), action='store_true',
+ default=False,
+ help=('Disable the {0} test group'.format(group)))
+
+ for group in groups:
parser.addoption('--only-{0}'.format(group), action='store_true',
default=False,
help=('Run only the {0} test group'.format(group)))
@@ -62,12 +67,14 @@ def pytest_runtest_setup(item):
for group in groups:
only_flag = '--only-{0}'.format(group)
+ disable_flag = '--disable-{0}'.format(group)
flag = '--{0}'.format(group)
if item.config.getoption(only_flag):
only_set = True
elif getattr(item.obj, group, None):
- if not item.config.getoption(flag):
+ if (item.config.getoption(disable_flag) or
+ not item.config.getoption(flag)):
skip('{0} NOT enabled'.format(flag))
if only_set:
http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/tests/test_plasma.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index e168d9f..04162bb 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -168,6 +168,12 @@ class TestPlasmaClient(object):
else:
self.p.kill()
+ def test_connection_failure_raises_exception(self):
+ import pyarrow.plasma as plasma
+ # ARROW-1264
+ with pytest.raises(IOError):
+ plasma.connect('unknown-store-name', '', 0, 1)
+
def test_create(self):
# Create an object id string.
object_id = random_object_id()