You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/30 03:14:13 UTC

arrow git commit: ARROW-1264: [Python] Raise exception in Python instead of aborting if cannot connect to Plasma store

Repository: arrow
Updated Branches:
  refs/heads/master 4108bda82 -> 2288bfc18


ARROW-1264: [Python] Raise exception in Python instead of aborting if cannot connect to Plasma store

cc @pcmoritz @robertnishihara

Author: Wes McKinney <we...@twosigma.com>

Closes #912 from wesm/ARROW-1264 and squashes the following commits:

bd134d7e [Wes McKinney] Add flags to disable certain classes of unit tests
1d9de777 [Wes McKinney] Raise exception in Python instead of aborting if cannot connect to Plasma store


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2288bfc1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2288bfc1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2288bfc1

Branch: refs/heads/master
Commit: 2288bfc18fdbd6f50eb6c184d2349bcdd538f469
Parents: 4108bda
Author: Wes McKinney <we...@twosigma.com>
Authored: Sat Jul 29 23:14:09 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Jul 29 23:14:09 2017 -0400

----------------------------------------------------------------------
 cpp/src/plasma/client.cc            |  8 +++--
 cpp/src/plasma/client.h             |  4 ++-
 cpp/src/plasma/io.cc                | 30 +++++++++++-----
 cpp/src/plasma/io.h                 | 17 ++++++---
 python/manylinux1/build_arrow.sh    |  5 ++-
 python/pyarrow/plasma.pyx           | 60 ++++++++++++++++++++++----------
 python/pyarrow/tests/conftest.py    |  9 ++++-
 python/pyarrow/tests/test_plasma.py |  6 ++++
 8 files changed, 100 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index e14b3d9..8ea62c6 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -512,10 +512,12 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_
 }
 
 Status PlasmaClient::Connect(const std::string& store_socket_name,
-                             const std::string& manager_socket_name, int release_delay) {
-  store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
+                             const std::string& manager_socket_name, int release_delay,
+                             int num_retries) {
+  RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
   if (manager_socket_name != "") {
-    manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1);
+    RETURN_NOT_OK(
+        ConnectIpcSocketRetry(manager_socket_name, num_retries, -1, &manager_conn_));
   } else {
     manager_conn_ = -1;
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/client.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index cc05a06..50ec55f 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -89,9 +89,11 @@ class ARROW_EXPORT PlasmaClient {
   ///        function will not connect to a manager.
   /// @param release_delay Number of released objects that are kept around
   ///        and not evicted to avoid too many munmaps.
+  /// @param num_retries number of attempts to connect to IPC socket, default 50
   /// @return The return status.
   Status Connect(const std::string& store_socket_name,
-                 const std::string& manager_socket_name, int release_delay);
+                 const std::string& manager_socket_name, int release_delay,
+                 int num_retries = -1);
 
   /// Create an object in the Plasma Store. Any metadata for this object must be
   /// be passed in when the object is created.

http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
index e3b6b61..9bb4339 100644
--- a/cpp/src/plasma/io.cc
+++ b/cpp/src/plasma/io.cc
@@ -17,6 +17,11 @@
 
 #include "plasma/io.h"
 
+#include <cstdint>
+#include <sstream>
+
+#include "arrow/status.h"
+
 #include "plasma/common.h"
 
 using arrow::Status;
@@ -29,6 +34,8 @@ using arrow::Status;
 #define NUM_CONNECT_ATTEMPTS 50
 #define CONNECT_TIMEOUT_MS 100
 
+namespace plasma {
+
 Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
   ssize_t nbytes = 0;
   size_t bytesleft = length;
@@ -140,8 +147,8 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
   return socket_fd;
 }
 
-int connect_ipc_sock_retry(const std::string& pathname, int num_retries,
-                           int64_t timeout) {
+Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
+                             int64_t timeout, int* fd) {
   /* Pick the default values if the user did not specify. */
   if (num_retries < 0) {
     num_retries = NUM_CONNECT_ATTEMPTS;
@@ -150,23 +157,26 @@ int connect_ipc_sock_retry(const std::string& pathname, int num_retries,
     timeout = CONNECT_TIMEOUT_MS;
   }
 
-  int fd = -1;
+  *fd = -1;
   for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
-    fd = connect_ipc_sock(pathname);
-    if (fd >= 0) {
+    *fd = connect_ipc_sock(pathname);
+    if (*fd >= 0) {
       break;
     }
     if (num_attempts == 0) {
-      ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname;
+      ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
+                       << ", retrying " << num_retries << " times";
     }
     /* Sleep for timeout milliseconds. */
     usleep(static_cast<int>(timeout * 1000));
   }
   /* If we could not connect to the socket, exit. */
-  if (fd == -1) {
-    ARROW_LOG(FATAL) << "Could not connect to socket " << pathname;
+  if (*fd == -1) {
+    std::stringstream ss;
+    ss << "Could not connect to socket " << pathname;
+    return Status::IOError(ss.str());
   }
-  return fd;
+  return Status::OK();
 }
 
 int connect_ipc_sock(const std::string& pathname) {
@@ -224,3 +234,5 @@ uint8_t* read_message_async(int sock) {
   }
   return message;
 }
+
+}  // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
index 43c3fb5..ef96c06 100644
--- a/cpp/src/plasma/io.h
+++ b/cpp/src/plasma/io.h
@@ -34,22 +34,29 @@
 #define PLASMA_PROTOCOL_VERSION 0x0000000000000000
 #define DISCONNECT_CLIENT 0
 
-arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length);
+namespace plasma {
 
-arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);
+using arrow::Status;
 
-arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length);
+Status WriteBytes(int fd, uint8_t* cursor, size_t length);
 
-arrow::Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
+Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);
+
+Status ReadBytes(int fd, uint8_t* cursor, size_t length);
+
+Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
 
 int bind_ipc_sock(const std::string& pathname, bool shall_listen);
 
 int connect_ipc_sock(const std::string& pathname);
 
-int connect_ipc_sock_retry(const std::string& pathname, int num_retries, int64_t timeout);
+Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
+                             int64_t timeout, int* fd);
 
 int AcceptClient(int socket_fd);
 
 uint8_t* read_message_async(int sock);
 
+}  // namespace plasma
+
 #endif  // PLASMA_IO_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/manylinux1/build_arrow.sh
----------------------------------------------------------------------
diff --git a/python/manylinux1/build_arrow.sh b/python/manylinux1/build_arrow.sh
index 5725b2a..5a21e36 100755
--- a/python/manylinux1/build_arrow.sh
+++ b/python/manylinux1/build_arrow.sh
@@ -80,7 +80,10 @@ for PYTHON in ${PYTHON_VERSIONS}; do
     echo "=== (${PYTHON}) Testing manylinux1 wheel ==="
     source /venv-test-${PYTHON}/bin/activate
     pip install repaired_wheels/*.whl
-    py.test --parquet /venv-test-${PYTHON}/lib/*/site-packages/pyarrow
+
+    # ARROW-1264; for some reason the test case added causes a segfault inside
+    # the Docker container when writing and error message to stderr
+    py.test --parquet /venv-test-${PYTHON}/lib/*/site-packages/pyarrow -v -s --disable-plasma
     deactivate
 
     mv repaired_wheels/*.whl /io/dist

http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/plasma.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 8aaca99..dd62d47 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -70,7 +70,8 @@ cdef extern from "plasma/client.h" nogil:
         CPlasmaClient()
 
         CStatus Connect(const c_string& store_socket_name,
-                        const c_string& manager_socket_name, int release_delay)
+                        const c_string& manager_socket_name,
+                        int release_delay, int num_retries)
 
         CStatus Create(const CUniqueID& object_id, int64_t data_size,
                        const uint8_t* metadata, int64_t metadata_size,
@@ -98,10 +99,13 @@ cdef extern from "plasma/client.h" nogil:
 
         CStatus Fetch(int num_object_ids, const CUniqueID* object_ids)
 
-        CStatus Wait(int64_t num_object_requests, CObjectRequest* object_requests,
-           int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
+        CStatus Wait(int64_t num_object_requests,
+                     CObjectRequest* object_requests,
+                     int num_ready_objects, int64_t timeout_ms,
+                     int* num_objects_ready);
 
-        CStatus Transfer(const char* addr, int port, const CUniqueID& object_id)
+        CStatus Transfer(const char* addr, int port,
+                         const CUniqueID& object_id)
 
 
 cdef extern from "plasma/client.h" nogil:
@@ -247,7 +251,8 @@ cdef class PlasmaClient:
     def manager_socket_name(self):
         return self.manager_socket_name.decode()
 
-    def create(self, ObjectID object_id, int64_t data_size, c_string metadata=b""):
+    def create(self, ObjectID object_id, int64_t data_size,
+               c_string metadata=b""):
         """
         Create a new buffer in the PlasmaStore for a particular object ID.
 
@@ -439,7 +444,8 @@ cdef class PlasmaClient:
         """
         cdef c_string addr = address.encode()
         with nogil:
-            check_status(self.client.get().Transfer(addr.c_str(), port, object_id.data))
+            check_status(self.client.get()
+                         .Transfer(addr.c_str(), port, object_id.data))
 
     def fetch(self, object_ids):
         """
@@ -457,7 +463,8 @@ cdef class PlasmaClient:
         with nogil:
             check_status(self.client.get().Fetch(ids.size(), ids.data()))
 
-    def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT, int num_returns=1):
+    def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT,
+             int num_returns=1):
         """
         Wait until num_returns objects in object_ids are ready.
         Currently, the object ID arguments to wait must be unique.
@@ -483,14 +490,18 @@ cdef class PlasmaClient:
         if len(object_ids) != len(set(object_ids)):
             raise Exception("Wait requires a list of unique object IDs.")
         cdef int64_t num_object_requests = len(object_ids)
-        cdef c_vector[CObjectRequest] object_requests = c_vector[CObjectRequest](num_object_requests)
+        cdef c_vector[CObjectRequest] object_requests = (
+            c_vector[CObjectRequest](num_object_requests))
         cdef int num_objects_ready = 0
         cdef ObjectID object_id
         for i, object_id in enumerate(object_ids):
             object_requests[i].object_id = object_id.data
             object_requests[i].type = PLASMA_QUERY_ANYWHERE
         with nogil:
-            check_status(self.client.get().Wait(num_object_requests, object_requests.data(), num_returns, timeout, &num_objects_ready))
+            check_status(self.client.get().Wait(num_object_requests,
+                                                object_requests.data(),
+                                                num_returns, timeout,
+                                                &num_objects_ready))
         cdef int num_to_return = min(num_objects_ready, num_returns);
         ready_ids = []
         waiting_ids = set(object_ids)
@@ -498,9 +509,12 @@ cdef class PlasmaClient:
         for i in range(len(object_ids)):
             if num_returned == num_to_return:
                 break
-            if object_requests[i].status == ObjectStatusLocal or object_requests[i].status == ObjectStatusRemote:
-                ready_ids.append(ObjectID(object_requests[i].object_id.binary()))
-                waiting_ids.discard(ObjectID(object_requests[i].object_id.binary()))
+            if (object_requests[i].status == ObjectStatusLocal or
+                object_requests[i].status == ObjectStatusRemote):
+                ready_ids.append(
+                    ObjectID(object_requests[i].object_id.binary()))
+                waiting_ids.discard(
+                    ObjectID(object_requests[i].object_id.binary()))
                 num_returned += 1
         return ready_ids, list(waiting_ids)
 
@@ -526,10 +540,11 @@ cdef class PlasmaClient:
         cdef int64_t data_size
         cdef int64_t metadata_size
         with nogil:
-            check_status(self.client.get().GetNotification(self.notification_fd,
-                                                           &object_id.data,
-                                                           &data_size,
-                                                           &metadata_size))
+            check_status(self.client.get()
+                         .GetNotification(self.notification_fd,
+                                          &object_id.data,
+                                          &data_size,
+                                          &metadata_size))
         return object_id, data_size, metadata_size
 
     def to_capsule(self):
@@ -542,7 +557,9 @@ cdef class PlasmaClient:
         with nogil:
             check_status(self.client.get().Disconnect())
 
-def connect(store_socket_name, manager_socket_name, int release_delay):
+
+def connect(store_socket_name, manager_socket_name, int release_delay,
+            int num_retries=-1):
     """
     Return a new PlasmaClient that is connected a plasma store and
     optionally a manager.
@@ -556,11 +573,16 @@ def connect(store_socket_name, manager_socket_name, int release_delay):
     release_delay : int
         The maximum number of objects that the client will keep and
         delay releasing (for caching reasons).
+    num_retries : int, default -1
+        Number of times tor ty to connect to plasma store. Default value of -1
+        uses the default (50)
     """
     cdef PlasmaClient result = PlasmaClient()
     result.store_socket_name = store_socket_name.encode()
     result.manager_socket_name = manager_socket_name.encode()
     with nogil:
-        check_status(result.client.get().Connect(result.store_socket_name,
-                     result.manager_socket_name, release_delay))
+        check_status(result.client.get()
+                     .Connect(result.store_socket_name,
+                              result.manager_socket_name,
+                              release_delay, num_retries))
     return result

http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/tests/conftest.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py
index f2d67f6..651438b 100644
--- a/python/pyarrow/tests/conftest.py
+++ b/python/pyarrow/tests/conftest.py
@@ -52,6 +52,11 @@ def pytest_addoption(parser):
                          help=('Enable the {0} test group'.format(group)))
 
     for group in groups:
+        parser.addoption('--disable-{0}'.format(group), action='store_true',
+                         default=False,
+                         help=('Disable the {0} test group'.format(group)))
+
+    for group in groups:
         parser.addoption('--only-{0}'.format(group), action='store_true',
                          default=False,
                          help=('Run only the {0} test group'.format(group)))
@@ -62,12 +67,14 @@ def pytest_runtest_setup(item):
 
     for group in groups:
         only_flag = '--only-{0}'.format(group)
+        disable_flag = '--disable-{0}'.format(group)
         flag = '--{0}'.format(group)
 
         if item.config.getoption(only_flag):
             only_set = True
         elif getattr(item.obj, group, None):
-            if not item.config.getoption(flag):
+            if (item.config.getoption(disable_flag) or
+                    not item.config.getoption(flag)):
                 skip('{0} NOT enabled'.format(flag))
 
     if only_set:

http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/tests/test_plasma.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index e168d9f..04162bb 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -168,6 +168,12 @@ class TestPlasmaClient(object):
         else:
             self.p.kill()
 
+    def test_connection_failure_raises_exception(self):
+        import pyarrow.plasma as plasma
+        # ARROW-1264
+        with pytest.raises(IOError):
+            plasma.connect('unknown-store-name', '', 0, 1)
+
     def test_create(self):
         # Create an object id string.
         object_id = random_object_id()