You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2019/07/04 05:52:15 UTC
[arrow] 36/38: ARROW-4036: [C++] Pluggable Status message,
by exposing an abstract delegate class.
This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit ee286fe7a20c832d309a08f064a779f39591144a
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Wed Jul 3 12:14:41 2019 +0200
ARROW-4036: [C++] Pluggable Status message, by exposing an abstract delegate class.
This provides less "pluggability" but I think still offers a clean model for extension (subsystems can wrap the constructor for there purposes, and provide external static methods to check for particular types of errors).
Author: Micah Kornfield <em...@gmail.com>
Author: Antoine Pitrou <an...@python.org>
Closes #4484 from emkornfield/status_code_proposal and squashes the following commits:
4d1ab8d1d <Micah Kornfield> don't import plasma errors directly into top level pyarrow module
a66f999f8 <Micah Kornfield> make format
040216d48 <Micah Kornfield> fixes for comments outside python
729bba1ff <Antoine Pitrou> Fix Py2 issues (hopefully)
ea56d1e6a <Antoine Pitrou> Fix PythonErrorDetail to store Python error state (and restore it in check_status())
21e1b95ac <Micah Kornfield> fix compilation
9c905b094 <Micah Kornfield> fix lint
74d563cd7 <Micah Kornfield> fixes
85786efb1 <Micah Kornfield> change messages
3626a9016 <Micah Kornfield> try removing message
a4e6a1ff2 <Micah Kornfield> add logging for debug
4586fd1e2 <Micah Kornfield> fix typo
8f011b329 <Micah Kornfield> fix status propagation
317ea9c66 <Micah Kornfield> fix complie
9f5916070 <Micah Kornfield> don't make_shared inline
484b3a232 <Micah Kornfield> style fix
14e3467b5 <Micah Kornfield> dont rely on rtti
cd22df64d <Micah Kornfield> format
dec458506 <Micah Kornfield> not-quite pluggable error codes
---
c_glib/arrow-glib/error.cpp | 13 +-
c_glib/arrow-glib/error.h | 12 +-
c_glib/test/plasma/test-plasma-created-object.rb | 2 +-
cpp/src/arrow/compute/kernels/cast.cc | 2 +-
cpp/src/arrow/csv/column-builder.cc | 2 +-
cpp/src/arrow/python/common.cc | 180 ++++++++++++++++-------
cpp/src/arrow/python/common.h | 25 +++-
cpp/src/arrow/python/python-test.cc | 106 +++++++++----
cpp/src/arrow/python/serialize.cc | 2 +-
cpp/src/arrow/status-test.cc | 29 ++++
cpp/src/arrow/status.cc | 32 ++--
cpp/src/arrow/status.h | 78 ++++------
cpp/src/plasma/client.cc | 9 +-
cpp/src/plasma/common.cc | 81 ++++++++++
cpp/src/plasma/common.h | 17 +++
cpp/src/plasma/protocol.cc | 9 +-
cpp/src/plasma/test/client_tests.cc | 4 +-
cpp/src/plasma/test/serialization_tests.cc | 4 +-
python/pyarrow/__init__.py | 3 +-
python/pyarrow/_plasma.pyx | 94 ++++++++----
python/pyarrow/error.pxi | 29 +---
python/pyarrow/includes/common.pxd | 5 +-
python/pyarrow/includes/libarrow.pxd | 7 +
python/pyarrow/includes/libplasma.pxd | 25 ++++
python/pyarrow/plasma.py | 4 +-
python/pyarrow/tests/test_array.py | 2 +-
python/pyarrow/tests/test_convert_builtin.py | 73 +++++++--
python/pyarrow/tests/test_plasma.py | 4 +-
28 files changed, 586 insertions(+), 267 deletions(-)
diff --git a/c_glib/arrow-glib/error.cpp b/c_glib/arrow-glib/error.cpp
index a56b6ec..4c14615 100644
--- a/c_glib/arrow-glib/error.cpp
+++ b/c_glib/arrow-glib/error.cpp
@@ -65,22 +65,15 @@ garrow_error_code(const arrow::Status &status)
return GARROW_ERROR_NOT_IMPLEMENTED;
case arrow::StatusCode::SerializationError:
return GARROW_ERROR_SERIALIZATION;
- case arrow::StatusCode::PythonError:
- return GARROW_ERROR_PYTHON;
- case arrow::StatusCode::PlasmaObjectExists:
- return GARROW_ERROR_PLASMA_OBJECT_EXISTS;
- case arrow::StatusCode::PlasmaObjectNonexistent:
- return GARROW_ERROR_PLASMA_OBJECT_NONEXISTENT;
- case arrow::StatusCode::PlasmaStoreFull:
- return GARROW_ERROR_PLASMA_STORE_FULL;
- case arrow::StatusCode::PlasmaObjectAlreadySealed:
- return GARROW_ERROR_PLASMA_OBJECT_ALREADY_SEALED;
case arrow::StatusCode::CodeGenError:
return GARROW_ERROR_CODE_GENERATION;
case arrow::StatusCode::ExpressionValidationError:
return GARROW_ERROR_EXPRESSION_VALIDATION;
case arrow::StatusCode::ExecutionError:
return GARROW_ERROR_EXECUTION;
+ case arrow::StatusCode::AlreadyExists:
+ return GARROW_ERROR_ALREADY_EXISTS;
+
default:
return GARROW_ERROR_UNKNOWN;
}
diff --git a/c_glib/arrow-glib/error.h b/c_glib/arrow-glib/error.h
index 3dea9fc..2fac5ad 100644
--- a/c_glib/arrow-glib/error.h
+++ b/c_glib/arrow-glib/error.h
@@ -35,15 +35,11 @@ G_BEGIN_DECLS
* @GARROW_ERROR_UNKNOWN: Unknown error.
* @GARROW_ERROR_NOT_IMPLEMENTED: The feature is not implemented.
* @GARROW_ERROR_SERIALIZATION: Serialization error.
- * @GARROW_ERROR_PYTHON: Python error.
- * @GARROW_ERROR_PLASMA_OBJECT_EXISTS: Object already exists on Plasma.
- * @GARROW_ERROR_PLASMA_OBJECT_NONEXISTENT: Object doesn't exist on Plasma.
- * @GARROW_ERROR_PLASMA_STORE_FULL: Store full error on Plasma.
- * @GARROW_ERROR_PLASMA_OBJECT_ALREADY_SEALED: Object already sealed on Plasma.
* @GARROW_ERROR_CODE_GENERATION: Error generating code for expression evaluation
* in Gandiva.
* @GARROW_ERROR_EXPRESSION_VALIDATION: Validation errors in expression given for code generation.
* @GARROW_ERROR_EXECUTION: Execution error while evaluating the expression against a record batch.
+ * @GARROW_ALREADY_EXISTS: Item already exists error.
*
* The error codes are used by all arrow-glib functions.
*
@@ -60,14 +56,10 @@ typedef enum {
GARROW_ERROR_UNKNOWN = 9,
GARROW_ERROR_NOT_IMPLEMENTED,
GARROW_ERROR_SERIALIZATION,
- GARROW_ERROR_PYTHON,
- GARROW_ERROR_PLASMA_OBJECT_EXISTS = 20,
- GARROW_ERROR_PLASMA_OBJECT_NONEXISTENT,
- GARROW_ERROR_PLASMA_STORE_FULL,
- GARROW_ERROR_PLASMA_OBJECT_ALREADY_SEALED,
GARROW_ERROR_CODE_GENERATION = 40,
GARROW_ERROR_EXPRESSION_VALIDATION = 41,
GARROW_ERROR_EXECUTION = 42,
+ GARROW_ERROR_ALREADY_EXISTS = 45,
} GArrowError;
#define GARROW_ERROR garrow_error_quark()
diff --git a/c_glib/test/plasma/test-plasma-created-object.rb b/c_glib/test/plasma/test-plasma-created-object.rb
index 9025ff4..857322d 100644
--- a/c_glib/test/plasma/test-plasma-created-object.rb
+++ b/c_glib/test/plasma/test-plasma-created-object.rb
@@ -45,7 +45,7 @@ class TestPlasmaCreatedObject < Test::Unit::TestCase
test("#abort") do
@object.data.set_data(0, @data)
- assert_raise(Arrow::Error::PlasmaObjectExists) do
+ assert_raise(Arrow::Error::AlreadyExists) do
@client.create(@id, @data.bytesize, @options)
end
@object.abort
diff --git a/cpp/src/arrow/compute/kernels/cast.cc b/cpp/src/arrow/compute/kernels/cast.cc
index 299ca80..93feb65 100644
--- a/cpp/src/arrow/compute/kernels/cast.cc
+++ b/cpp/src/arrow/compute/kernels/cast.cc
@@ -52,7 +52,7 @@
if (ARROW_PREDICT_FALSE(!_s.ok())) { \
std::stringstream ss; \
ss << __FILE__ << ":" << __LINE__ << " code: " << #s << "\n" << _s.message(); \
- ctx->SetStatus(Status(_s.code(), ss.str())); \
+ ctx->SetStatus(Status(_s.code(), ss.str(), s.detail())); \
return; \
} \
} while (0)
diff --git a/cpp/src/arrow/csv/column-builder.cc b/cpp/src/arrow/csv/column-builder.cc
index 657aa6f..4099507 100644
--- a/cpp/src/arrow/csv/column-builder.cc
+++ b/cpp/src/arrow/csv/column-builder.cc
@@ -76,7 +76,7 @@ class TypedColumnBuilder : public ColumnBuilder {
} else {
std::stringstream ss;
ss << "In column #" << col_index_ << ": " << st.message();
- return Status(st.code(), ss.str());
+ return Status(st.code(), ss.str(), st.detail());
}
}
diff --git a/cpp/src/arrow/python/common.cc b/cpp/src/arrow/python/common.cc
index aa44ec0..3cebc03 100644
--- a/cpp/src/arrow/python/common.cc
+++ b/cpp/src/arrow/python/common.cc
@@ -23,11 +23,15 @@
#include "arrow/memory_pool.h"
#include "arrow/status.h"
+#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/python/helpers.h"
namespace arrow {
+
+using internal::checked_cast;
+
namespace py {
static std::mutex memory_pool_mutex;
@@ -48,6 +52,129 @@ MemoryPool* get_memory_pool() {
}
// ----------------------------------------------------------------------
+// PythonErrorDetail
+
+namespace {
+
+const char kErrorDetailTypeId[] = "arrow::py::PythonErrorDetail";
+
+// Try to match the Python exception type with an appropriate Status code
+StatusCode MapPyError(PyObject* exc_type) {
+ StatusCode code;
+
+ if (PyErr_GivenExceptionMatches(exc_type, PyExc_MemoryError)) {
+ code = StatusCode::OutOfMemory;
+ } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_IndexError)) {
+ code = StatusCode::IndexError;
+ } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_KeyError)) {
+ code = StatusCode::KeyError;
+ } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_TypeError)) {
+ code = StatusCode::TypeError;
+ } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_ValueError) ||
+ PyErr_GivenExceptionMatches(exc_type, PyExc_OverflowError)) {
+ code = StatusCode::Invalid;
+ } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_EnvironmentError)) {
+ code = StatusCode::IOError;
+ } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_NotImplementedError)) {
+ code = StatusCode::NotImplemented;
+ } else {
+ code = StatusCode::UnknownError;
+ }
+ return code;
+}
+
+// PythonErrorDetail indicates a Python exception was raised.
+class PythonErrorDetail : public StatusDetail {
+ public:
+ const char* type_id() const override { return kErrorDetailTypeId; }
+
+ std::string ToString() const override {
+ // This is simple enough not to need the GIL
+ const auto ty = reinterpret_cast<const PyTypeObject*>(exc_type_.obj());
+ // XXX Should we also print traceback?
+ return std::string("Python exception: ") + ty->tp_name;
+ }
+
+ void RestorePyError() const {
+ Py_INCREF(exc_type_.obj());
+ Py_INCREF(exc_value_.obj());
+ Py_INCREF(exc_traceback_.obj());
+ PyErr_Restore(exc_type_.obj(), exc_value_.obj(), exc_traceback_.obj());
+ }
+
+ PyObject* exc_type() const { return exc_type_.obj(); }
+
+ PyObject* exc_value() const { return exc_value_.obj(); }
+
+ static std::shared_ptr<PythonErrorDetail> FromPyError() {
+ PyObject* exc_type = nullptr;
+ PyObject* exc_value = nullptr;
+ PyObject* exc_traceback = nullptr;
+
+ PyErr_Fetch(&exc_type, &exc_value, &exc_traceback);
+ PyErr_NormalizeException(&exc_type, &exc_value, &exc_traceback);
+ ARROW_CHECK(exc_type)
+ << "PythonErrorDetail::FromPyError called without a Python error set";
+ DCHECK(PyType_Check(exc_type));
+ DCHECK(exc_value); // Ensured by PyErr_NormalizeException, double-check
+ if (exc_traceback == nullptr) {
+ // Needed by PyErr_Restore()
+ Py_INCREF(Py_None);
+ exc_traceback = Py_None;
+ }
+
+ std::shared_ptr<PythonErrorDetail> detail(new PythonErrorDetail);
+ detail->exc_type_.reset(exc_type);
+ detail->exc_value_.reset(exc_value);
+ detail->exc_traceback_.reset(exc_traceback);
+ return detail;
+ }
+
+ protected:
+ PythonErrorDetail() = default;
+
+ OwnedRefNoGIL exc_type_, exc_value_, exc_traceback_;
+};
+
+} // namespace
+
+// ----------------------------------------------------------------------
+// Python exception <-> Status
+
+Status ConvertPyError(StatusCode code) {
+ auto detail = PythonErrorDetail::FromPyError();
+ if (code == StatusCode::UnknownError) {
+ code = MapPyError(detail->exc_type());
+ }
+
+ std::string message;
+ RETURN_NOT_OK(internal::PyObject_StdStringStr(detail->exc_value(), &message));
+ return Status(code, message, detail);
+}
+
+Status PassPyError() {
+ if (PyErr_Occurred()) {
+ return ConvertPyError();
+ }
+ return Status::OK();
+}
+
+bool IsPyError(const Status& status) {
+ if (status.ok()) {
+ return false;
+ }
+ auto detail = status.detail();
+ bool result = detail != nullptr && detail->type_id() == kErrorDetailTypeId;
+ return result;
+}
+
+void RestorePyError(const Status& status) {
+ ARROW_CHECK(IsPyError(status));
+ const auto& detail = checked_cast<const PythonErrorDetail&>(*status.detail());
+ detail.RestorePyError();
+}
+
+// ----------------------------------------------------------------------
// PyBuffer
PyBuffer::PyBuffer() : Buffer(nullptr, 0) {}
@@ -64,7 +191,7 @@ Status PyBuffer::Init(PyObject* obj) {
}
return Status::OK();
} else {
- return Status(StatusCode::PythonError, "");
+ return ConvertPyError(StatusCode::Invalid);
}
}
@@ -83,56 +210,5 @@ PyBuffer::~PyBuffer() {
}
}
-// ----------------------------------------------------------------------
-// Python exception -> Status
-
-Status ConvertPyError(StatusCode code) {
- PyObject* exc_type = nullptr;
- PyObject* exc_value = nullptr;
- PyObject* traceback = nullptr;
-
- PyErr_Fetch(&exc_type, &exc_value, &traceback);
- PyErr_NormalizeException(&exc_type, &exc_value, &traceback);
-
- DCHECK_NE(exc_type, nullptr) << "ConvertPyError called without an exception set";
-
- OwnedRef exc_type_ref(exc_type);
- OwnedRef exc_value_ref(exc_value);
- OwnedRef traceback_ref(traceback);
-
- std::string message;
- RETURN_NOT_OK(internal::PyObject_StdStringStr(exc_value, &message));
-
- if (code == StatusCode::UnknownError) {
- // Try to match the Python exception type with an appropriate Status code
- if (PyErr_GivenExceptionMatches(exc_type, PyExc_MemoryError)) {
- code = StatusCode::OutOfMemory;
- } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_IndexError)) {
- code = StatusCode::IndexError;
- } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_KeyError)) {
- code = StatusCode::KeyError;
- } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_TypeError)) {
- code = StatusCode::TypeError;
- } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_ValueError) ||
- PyErr_GivenExceptionMatches(exc_type, PyExc_OverflowError)) {
- code = StatusCode::Invalid;
- } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_EnvironmentError)) {
- code = StatusCode::IOError;
- } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_NotImplementedError)) {
- code = StatusCode::NotImplemented;
- }
- }
- return Status(code, message);
-}
-
-Status PassPyError() {
- if (PyErr_Occurred()) {
- // Do not call PyErr_Clear, the assumption is that someone further
- // up the call stack will want to deal with the Python error.
- return Status(StatusCode::PythonError, "");
- }
- return Status::OK();
-}
-
} // namespace py
} // namespace arrow
diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h
index 766b764..9d3dc0c 100644
--- a/cpp/src/arrow/python/common.h
+++ b/cpp/src/arrow/python/common.h
@@ -36,7 +36,15 @@ class Result;
namespace py {
+// Convert current Python error to a Status. The Python error state is cleared
+// and can be restored with RestorePyError().
ARROW_PYTHON_EXPORT Status ConvertPyError(StatusCode code = StatusCode::UnknownError);
+// Same as ConvertPyError(), but returns Status::OK() if no Python error is set.
+ARROW_PYTHON_EXPORT Status PassPyError();
+// Query whether the given Status is a Python error (as wrapped by ConvertPyError()).
+ARROW_PYTHON_EXPORT bool IsPyError(const Status& status);
+// Restore a Python error wrapped in a Status.
+ARROW_PYTHON_EXPORT void RestorePyError(const Status& status);
// Catch a pending Python exception and return the corresponding Status.
// If no exception is pending, Status::OK() is returned.
@@ -48,9 +56,6 @@ inline Status CheckPyError(StatusCode code = StatusCode::UnknownError) {
}
}
-ARROW_PYTHON_EXPORT Status PassPyError();
-
-// TODO(wesm): We can just let errors pass through. To be explored later
#define RETURN_IF_PYERROR() ARROW_RETURN_NOT_OK(CheckPyError());
#define PY_RETURN_IF_ERROR(CODE) ARROW_RETURN_NOT_OK(CheckPyError(CODE));
@@ -97,6 +102,18 @@ class ARROW_PYTHON_EXPORT PyAcquireGIL {
ARROW_DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL);
};
+// A RAII-style helper that releases the GIL until the end of a lexical block
+class ARROW_PYTHON_EXPORT PyReleaseGIL {
+ public:
+ PyReleaseGIL() { saved_state_ = PyEval_SaveThread(); }
+
+ ~PyReleaseGIL() { PyEval_RestoreThread(saved_state_); }
+
+ private:
+ PyThreadState* saved_state_;
+ ARROW_DISALLOW_COPY_AND_ASSIGN(PyReleaseGIL);
+};
+
// A helper to call safely into the Python interpreter from arbitrary C++ code.
// The GIL is acquired, and the current thread's error status is preserved.
template <typename Function>
@@ -109,7 +126,7 @@ Status SafeCallIntoPython(Function&& func) {
Status st = std::forward<Function>(func)();
// If the return Status is a "Python error", the current Python error status
// describes the error and shouldn't be clobbered.
- if (!st.IsPythonError() && exc_type != NULLPTR) {
+ if (!IsPyError(st) && exc_type != NULLPTR) {
PyErr_Restore(exc_type, exc_value, exc_traceback);
}
return st;
diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc
index 5de613f..5027d3f 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -40,21 +40,12 @@ using internal::checked_cast;
namespace py {
-TEST(PyBuffer, InvalidInputObject) {
- std::shared_ptr<Buffer> res;
- PyObject* input = Py_None;
- auto old_refcnt = Py_REFCNT(input);
- ASSERT_RAISES(PythonError, PyBuffer::FromPyObject(input, &res));
- PyErr_Clear();
- ASSERT_EQ(old_refcnt, Py_REFCNT(input));
-}
-
TEST(OwnedRef, TestMoves) {
- PyAcquireGIL lock;
std::vector<OwnedRef> vec;
PyObject *u, *v;
u = PyList_New(0);
v = PyList_New(0);
+
{
OwnedRef ref(u);
vec.push_back(std::move(ref));
@@ -66,31 +57,42 @@ TEST(OwnedRef, TestMoves) {
}
TEST(OwnedRefNoGIL, TestMoves) {
- std::vector<OwnedRefNoGIL> vec;
- PyObject *u, *v;
- {
- PyAcquireGIL lock;
- u = PyList_New(0);
- v = PyList_New(0);
- }
+ PyAcquireGIL lock;
+ lock.release();
+
{
- OwnedRefNoGIL ref(u);
- vec.push_back(std::move(ref));
- ASSERT_EQ(ref.obj(), nullptr);
+ std::vector<OwnedRef> vec;
+ PyObject *u, *v;
+ {
+ lock.acquire();
+ u = PyList_New(0);
+ v = PyList_New(0);
+ lock.release();
+ }
+ {
+ OwnedRefNoGIL ref(u);
+ vec.push_back(std::move(ref));
+ ASSERT_EQ(ref.obj(), nullptr);
+ }
+ vec.emplace_back(v);
+ ASSERT_EQ(Py_REFCNT(u), 1);
+ ASSERT_EQ(Py_REFCNT(v), 1);
}
- vec.emplace_back(v);
- ASSERT_EQ(Py_REFCNT(u), 1);
- ASSERT_EQ(Py_REFCNT(v), 1);
}
TEST(CheckPyError, TestStatus) {
- PyAcquireGIL lock;
Status st;
- auto check_error = [](Status& st, const char* expected_message = "some error") {
+ auto check_error = [](Status& st, const char* expected_message = "some error",
+ const char* expected_detail = nullptr) {
st = CheckPyError();
ASSERT_EQ(st.message(), expected_message);
ASSERT_FALSE(PyErr_Occurred());
+ if (expected_detail) {
+ auto detail = st.detail();
+ ASSERT_NE(detail, nullptr);
+ ASSERT_EQ(detail->ToString(), expected_detail);
+ }
};
for (PyObject* exc_type : {PyExc_Exception, PyExc_SyntaxError}) {
@@ -100,7 +102,7 @@ TEST(CheckPyError, TestStatus) {
}
PyErr_SetString(PyExc_TypeError, "some error");
- check_error(st);
+ check_error(st, "some error", "Python exception: TypeError");
ASSERT_TRUE(st.IsTypeError());
PyErr_SetString(PyExc_ValueError, "some error");
@@ -118,7 +120,7 @@ TEST(CheckPyError, TestStatus) {
}
PyErr_SetString(PyExc_NotImplementedError, "some error");
- check_error(st);
+ check_error(st, "some error", "Python exception: NotImplementedError");
ASSERT_TRUE(st.IsNotImplemented());
// No override if a specific status code is given
@@ -129,6 +131,52 @@ TEST(CheckPyError, TestStatus) {
ASSERT_FALSE(PyErr_Occurred());
}
+TEST(CheckPyError, TestStatusNoGIL) {
+ PyAcquireGIL lock;
+ {
+ Status st;
+ PyErr_SetString(PyExc_ZeroDivisionError, "zzzt");
+ st = ConvertPyError();
+ ASSERT_FALSE(PyErr_Occurred());
+ lock.release();
+ ASSERT_TRUE(st.IsUnknownError());
+ ASSERT_EQ(st.message(), "zzzt");
+ ASSERT_EQ(st.detail()->ToString(), "Python exception: ZeroDivisionError");
+ }
+}
+
+TEST(RestorePyError, Basics) {
+ PyErr_SetString(PyExc_ZeroDivisionError, "zzzt");
+ auto st = ConvertPyError();
+ ASSERT_FALSE(PyErr_Occurred());
+ ASSERT_TRUE(st.IsUnknownError());
+ ASSERT_EQ(st.message(), "zzzt");
+ ASSERT_EQ(st.detail()->ToString(), "Python exception: ZeroDivisionError");
+
+ RestorePyError(st);
+ ASSERT_TRUE(PyErr_Occurred());
+ PyObject* exc_type;
+ PyObject* exc_value;
+ PyObject* exc_traceback;
+ PyErr_Fetch(&exc_type, &exc_value, &exc_traceback);
+ ASSERT_TRUE(PyErr_GivenExceptionMatches(exc_type, PyExc_ZeroDivisionError));
+ std::string py_message;
+ ASSERT_OK(internal::PyObject_StdStringStr(exc_value, &py_message));
+ ASSERT_EQ(py_message, "zzzt");
+}
+
+TEST(PyBuffer, InvalidInputObject) {
+ std::shared_ptr<Buffer> res;
+ PyObject* input = Py_None;
+ auto old_refcnt = Py_REFCNT(input);
+ {
+ Status st = PyBuffer::FromPyObject(input, &res);
+ ASSERT_TRUE(IsPyError(st)) << st.ToString();
+ ASSERT_FALSE(PyErr_Occurred());
+ }
+ ASSERT_EQ(old_refcnt, Py_REFCNT(input));
+}
+
class DecimalTest : public ::testing::Test {
public:
DecimalTest() : lock_(), decimal_constructor_() {
@@ -253,8 +301,6 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) {
}
TEST(BuiltinConversionTest, TestMixedTypeFails) {
- PyAcquireGIL lock;
-
OwnedRef list_ref(PyList_New(3));
PyObject* list = list_ref.obj();
@@ -405,8 +451,6 @@ TEST_F(DecimalTest, TestMixedPrecisionAndScale) {
}
TEST_F(DecimalTest, TestMixedPrecisionAndScaleSequenceConvert) {
- PyAcquireGIL lock;
-
PyObject* value1 = this->CreatePythonDecimal("0.01").detach();
ASSERT_NE(value1, nullptr);
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index d93e395..5784394 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -332,8 +332,8 @@ Status SequenceBuilder::AppendDict(PyObject* context, PyObject* dict,
Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* elem,
PyObject** result) {
- *result = NULL;
if (context == Py_None) {
+ *result = NULL;
return Status::SerializationError("error while calling callback on ",
internal::PyObject_StdStringRepr(elem),
": handler not registered");
diff --git a/cpp/src/arrow/status-test.cc b/cpp/src/arrow/status-test.cc
index b7fc61f..b151e46 100644
--- a/cpp/src/arrow/status-test.cc
+++ b/cpp/src/arrow/status-test.cc
@@ -23,6 +23,16 @@
namespace arrow {
+namespace {
+
+class TestStatusDetail : public StatusDetail {
+ public:
+ const char* type_id() const override { return "type_id"; }
+ std::string ToString() const override { return "a specific detail message"; }
+};
+
+} // namespace
+
TEST(StatusTest, TestCodeAndMessage) {
Status ok = Status::OK();
ASSERT_EQ(StatusCode::OK, ok.code());
@@ -40,6 +50,25 @@ TEST(StatusTest, TestToString) {
ASSERT_EQ(file_error.ToString(), ss.str());
}
+TEST(StatusTest, TestToStringWithDetail) {
+ Status status(StatusCode::IOError, "summary", std::make_shared<TestStatusDetail>());
+ ASSERT_EQ("IOError: summary. Detail: a specific detail message", status.ToString());
+
+ std::stringstream ss;
+ ss << status;
+ ASSERT_EQ(status.ToString(), ss.str());
+}
+
+TEST(StatusTest, TestWithDetail) {
+ Status status(StatusCode::IOError, "summary");
+ auto detail = std::make_shared<TestStatusDetail>();
+ Status new_status = status.WithDetail(detail);
+
+ ASSERT_EQ(new_status.code(), status.code());
+ ASSERT_EQ(new_status.message(), status.message());
+ ASSERT_EQ(new_status.detail(), detail);
+}
+
TEST(StatusTest, AndStatus) {
Status a = Status::OK();
Status b = Status::OK();
diff --git a/cpp/src/arrow/status.cc b/cpp/src/arrow/status.cc
index cbb2911..785db45 100644
--- a/cpp/src/arrow/status.cc
+++ b/cpp/src/arrow/status.cc
@@ -21,11 +21,17 @@
namespace arrow {
-Status::Status(StatusCode code, const std::string& msg) {
+Status::Status(StatusCode code, const std::string& msg)
+ : Status::Status(code, msg, nullptr) {}
+
+Status::Status(StatusCode code, std::string msg, std::shared_ptr<StatusDetail> detail) {
ARROW_CHECK_NE(code, StatusCode::OK) << "Cannot construct ok status with message";
state_ = new State;
state_->code = code;
- state_->msg = msg;
+ state_->msg = std::move(msg);
+ if (detail != nullptr) {
+ state_->detail = std::move(detail);
+ }
}
void Status::CopyFrom(const Status& s) {
@@ -77,21 +83,6 @@ std::string Status::CodeAsString() const {
case StatusCode::SerializationError:
type = "Serialization error";
break;
- case StatusCode::PythonError:
- type = "Python error";
- break;
- case StatusCode::PlasmaObjectExists:
- type = "Plasma object exists";
- break;
- case StatusCode::PlasmaObjectNonexistent:
- type = "Plasma object is nonexistent";
- break;
- case StatusCode::PlasmaStoreFull:
- type = "Plasma store is full";
- break;
- case StatusCode::PlasmaObjectAlreadySealed:
- type = "Plasma object is already sealed";
- break;
case StatusCode::CodeGenError:
type = "CodeGenError in Gandiva";
break;
@@ -110,11 +101,16 @@ std::string Status::CodeAsString() const {
std::string Status::ToString() const {
std::string result(CodeAsString());
- if (state_ == NULL) {
+ if (state_ == nullptr) {
return result;
}
result += ": ";
result += state_->msg;
+ if (state_->detail != nullptr) {
+ result += ". Detail: ";
+ result += state_->detail->ToString();
+ }
+
return result;
}
diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h
index 1ed0da6..7cafc41 100644
--- a/cpp/src/arrow/status.h
+++ b/cpp/src/arrow/status.h
@@ -17,6 +17,7 @@
#include <cstring>
#include <iosfwd>
+#include <memory>
#include <string>
#include <utility>
@@ -85,17 +86,13 @@ enum class StatusCode : char {
UnknownError = 9,
NotImplemented = 10,
SerializationError = 11,
- PythonError = 12,
RError = 13,
- PlasmaObjectExists = 20,
- PlasmaObjectNonexistent = 21,
- PlasmaStoreFull = 22,
- PlasmaObjectAlreadySealed = 23,
- StillExecuting = 24,
// Gandiva range of errors
CodeGenError = 40,
ExpressionValidationError = 41,
- ExecutionError = 42
+ ExecutionError = 42,
+ // Continue generic codes.
+ AlreadyExists = 45
};
#if defined(__clang__)
@@ -103,6 +100,17 @@ enum class StatusCode : char {
class ARROW_MUST_USE_RESULT ARROW_EXPORT Status;
#endif
+/// \brief An opaque class that allows subsystems to retain
+/// additional information inside the Status.
+class ARROW_EXPORT StatusDetail {
+ public:
+ virtual ~StatusDetail() = default;
+ // Return a unique id for the type of the StatusDetail
+ // (effectively a poor man's substitude for RTTI).
+ virtual const char* type_id() const = 0;
+ virtual std::string ToString() const = 0;
+};
+
/// \brief Status outcome object (success or error)
///
/// The Status object is an object holding the outcome of an operation.
@@ -124,6 +132,8 @@ class ARROW_EXPORT Status {
}
Status(StatusCode code, const std::string& msg);
+ /// \brief Pluggable constructor for use by sub-systems. detail cannot be null.
+ Status(StatusCode code, std::string msg, std::shared_ptr<StatusDetail> detail);
// Copy the specified status.
inline Status(const Status& s);
@@ -222,32 +232,6 @@ class ARROW_EXPORT Status {
}
template <typename... Args>
- static Status PlasmaObjectExists(Args&&... args) {
- return Status(StatusCode::PlasmaObjectExists,
- util::StringBuilder(std::forward<Args>(args)...));
- }
-
- template <typename... Args>
- static Status PlasmaObjectNonexistent(Args&&... args) {
- return Status(StatusCode::PlasmaObjectNonexistent,
- util::StringBuilder(std::forward<Args>(args)...));
- }
-
- template <typename... Args>
- static Status PlasmaObjectAlreadySealed(Args&&... args) {
- return Status(StatusCode::PlasmaObjectAlreadySealed,
- util::StringBuilder(std::forward<Args>(args)...));
- }
-
- template <typename... Args>
- static Status PlasmaStoreFull(Args&&... args) {
- return Status(StatusCode::PlasmaStoreFull,
- util::StringBuilder(std::forward<Args>(args)...));
- }
-
- static Status StillExecuting() { return Status(StatusCode::StillExecuting, ""); }
-
- template <typename... Args>
static Status CodeGenError(Args&&... args) {
return Status(StatusCode::CodeGenError,
util::StringBuilder(std::forward<Args>(args)...));
@@ -290,22 +274,6 @@ class ARROW_EXPORT Status {
bool IsSerializationError() const { return code() == StatusCode::SerializationError; }
/// Return true iff the status indicates a R-originated error.
bool IsRError() const { return code() == StatusCode::RError; }
- /// Return true iff the status indicates a Python-originated error.
- bool IsPythonError() const { return code() == StatusCode::PythonError; }
- /// Return true iff the status indicates an already existing Plasma object.
- bool IsPlasmaObjectExists() const { return code() == StatusCode::PlasmaObjectExists; }
- /// Return true iff the status indicates a non-existent Plasma object.
- bool IsPlasmaObjectNonexistent() const {
- return code() == StatusCode::PlasmaObjectNonexistent;
- }
- /// Return true iff the status indicates an already sealed Plasma object.
- bool IsPlasmaObjectAlreadySealed() const {
- return code() == StatusCode::PlasmaObjectAlreadySealed;
- }
- /// Return true iff the status indicates the Plasma store reached its capacity limit.
- bool IsPlasmaStoreFull() const { return code() == StatusCode::PlasmaStoreFull; }
-
- bool IsStillExecuting() const { return code() == StatusCode::StillExecuting; }
bool IsCodeGenError() const { return code() == StatusCode::CodeGenError; }
@@ -330,6 +298,17 @@ class ARROW_EXPORT Status {
/// \brief Return the specific error message attached to this status.
std::string message() const { return ok() ? "" : state_->msg; }
+ /// \brief Return the status detail attached to this message.
+ std::shared_ptr<StatusDetail> detail() const {
+ return state_ == NULLPTR ? NULLPTR : state_->detail;
+ }
+
+ /// \brief Returns a new Status copying the existing status, but
+ /// updating with the existing detail.
+ Status WithDetail(std::shared_ptr<StatusDetail> new_detail) {
+ return Status(code(), message(), std::move(new_detail));
+ }
+
[[noreturn]] void Abort() const;
[[noreturn]] void Abort(const std::string& message) const;
@@ -341,6 +320,7 @@ class ARROW_EXPORT Status {
struct State {
StatusCode code;
std::string msg;
+ std::shared_ptr<StatusDetail> detail;
};
// OK status has a `NULL` state_. Otherwise, `state_` points to
// a `State` structure containing the error code and message(s)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index ce9795d..a6cdf7f 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -791,11 +791,12 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
auto object_entry = objects_in_use_.find(object_id);
if (object_entry == objects_in_use_.end()) {
- return Status::PlasmaObjectNonexistent(
- "Seal() called on an object without a reference to it");
+ return MakePlasmaError(PlasmaErrorCode::PlasmaObjectNonexistent,
+ "Seal() called on an object without a reference to it");
}
if (object_entry->second->is_sealed) {
- return Status::PlasmaObjectAlreadySealed("Seal() called on an already sealed object");
+ return MakePlasmaError(PlasmaErrorCode::PlasmaObjectAlreadySealed,
+ "Seal() called on an already sealed object");
}
object_entry->second->is_sealed = true;
@@ -896,7 +897,7 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
RETURN_NOT_OK(Get({object_id}, 0, &object_buffers));
// If the object was not retrieved, return false.
if (!object_buffers[0].data) {
- return Status::PlasmaObjectNonexistent("Object not found");
+ return MakePlasmaError(PlasmaErrorCode::PlasmaObjectNonexistent, "Object not found");
}
// Compute the hash.
uint64_t hash = ComputeObjectHash(object_buffers[0]);
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index 0f1a0d1..bbcd2c9 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -18,6 +18,7 @@
#include "plasma/common.h"
#include <limits>
+#include <utility>
#include "arrow/util/ubsan.h"
@@ -27,8 +28,88 @@ namespace fb = plasma::flatbuf;
namespace plasma {
+namespace {
+
+const char kErrorDetailTypeId[] = "plasma::PlasmaStatusDetail";
+
+class PlasmaStatusDetail : public arrow::StatusDetail {
+ public:
+ explicit PlasmaStatusDetail(PlasmaErrorCode code) : code_(code) {}
+ const char* type_id() const override { return kErrorDetailTypeId; }
+ std::string ToString() const override {
+ const char* type;
+ switch (code()) {
+ case PlasmaErrorCode::PlasmaObjectExists:
+ type = "Plasma object exists";
+ break;
+ case PlasmaErrorCode::PlasmaObjectNonexistent:
+ type = "Plasma object is nonexistent";
+ break;
+ case PlasmaErrorCode::PlasmaStoreFull:
+ type = "Plasma store is full";
+ break;
+ case PlasmaErrorCode::PlasmaObjectAlreadySealed:
+ type = "Plasma object is already sealed";
+ break;
+ default:
+ type = "Unknown plasma error";
+ break;
+ }
+ return std::string(type);
+ }
+ PlasmaErrorCode code() const { return code_; }
+
+ private:
+ PlasmaErrorCode code_;
+};
+
+bool IsPlasmaStatus(const arrow::Status& status, PlasmaErrorCode code) {
+ if (status.ok()) {
+ return false;
+ }
+ auto* detail = status.detail().get();
+ return detail != nullptr && detail->type_id() == kErrorDetailTypeId &&
+ static_cast<PlasmaStatusDetail*>(detail)->code() == code;
+}
+
+} // namespace
+
using arrow::Status;
+arrow::Status MakePlasmaError(PlasmaErrorCode code, std::string message) {
+ arrow::StatusCode arrow_code = arrow::StatusCode::UnknownError;
+ switch (code) {
+ case PlasmaErrorCode::PlasmaObjectExists:
+ arrow_code = arrow::StatusCode::AlreadyExists;
+ break;
+ case PlasmaErrorCode::PlasmaObjectNonexistent:
+ arrow_code = arrow::StatusCode::KeyError;
+ break;
+ case PlasmaErrorCode::PlasmaStoreFull:
+ arrow_code = arrow::StatusCode::CapacityError;
+ break;
+ case PlasmaErrorCode::PlasmaObjectAlreadySealed:
+ // Maybe a stretch?
+ arrow_code = arrow::StatusCode::TypeError;
+ break;
+ }
+ return arrow::Status(arrow_code, std::move(message),
+ std::make_shared<PlasmaStatusDetail>(code));
+}
+
+bool IsPlasmaObjectExists(const arrow::Status& status) {
+ return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectExists);
+}
+bool IsPlasmaObjectNonexistent(const arrow::Status& status) {
+ return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectNonexistent);
+}
+bool IsPlasmaObjectAlreadySealed(const arrow::Status& status) {
+ return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectAlreadySealed);
+}
+bool IsPlasmaStoreFull(const arrow::Status& status) {
+ return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaStoreFull);
+}
+
UniqueID UniqueID::from_binary(const std::string& binary) {
UniqueID id;
std::memcpy(&id, binary.data(), sizeof(id));
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 6f4cef5..d42840c 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -41,6 +41,23 @@ namespace plasma {
enum class ObjectLocation : int32_t { Local, Remote, Nonexistent };
+enum class PlasmaErrorCode : int8_t {
+ PlasmaObjectExists = 1,
+ PlasmaObjectNonexistent = 2,
+ PlasmaStoreFull = 3,
+ PlasmaObjectAlreadySealed = 4,
+};
+
+ARROW_EXPORT arrow::Status MakePlasmaError(PlasmaErrorCode code, std::string message);
+/// Return true iff the status indicates an already existing Plasma object.
+ARROW_EXPORT bool IsPlasmaObjectExists(const arrow::Status& status);
+/// Return true iff the status indicates a non-existent Plasma object.
+ARROW_EXPORT bool IsPlasmaObjectNonexistent(const arrow::Status& status);
+/// Return true iff the status indicates an already sealed Plasma object.
+ARROW_EXPORT bool IsPlasmaObjectAlreadySealed(const arrow::Status& status);
+/// Return true iff the status indicates the Plasma store reached its capacity limit.
+ARROW_EXPORT bool IsPlasmaStoreFull(const arrow::Status& status);
+
constexpr int64_t kUniqueIDSize = 20;
class ARROW_EXPORT UniqueID {
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index b87656b..c22d77d 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -86,11 +86,14 @@ Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
case fb::PlasmaError::OK:
return Status::OK();
case fb::PlasmaError::ObjectExists:
- return Status::PlasmaObjectExists("object already exists in the plasma store");
+ return MakePlasmaError(PlasmaErrorCode::PlasmaObjectExists,
+ "object already exists in the plasma store");
case fb::PlasmaError::ObjectNonexistent:
- return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
+ return MakePlasmaError(PlasmaErrorCode::PlasmaObjectNonexistent,
+ "object does not exist in the plasma store");
case fb::PlasmaError::OutOfMemory:
- return Status::PlasmaStoreFull("object does not fit in the plasma store");
+ return MakePlasmaError(PlasmaErrorCode::PlasmaStoreFull,
+ "object does not fit in the plasma store");
default:
ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
}
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 435b687..deffde5 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -157,7 +157,7 @@ TEST_F(TestPlasmaStore, SealErrorsTest) {
ObjectID object_id = random_object_id();
Status result = client_.Seal(object_id);
- ASSERT_TRUE(result.IsPlasmaObjectNonexistent());
+ ASSERT_TRUE(IsPlasmaObjectNonexistent(result));
// Create object.
std::vector<uint8_t> data(100, 0);
@@ -165,7 +165,7 @@ TEST_F(TestPlasmaStore, SealErrorsTest) {
// Trying to seal it again.
result = client_.Seal(object_id);
- ASSERT_TRUE(result.IsPlasmaObjectAlreadySealed());
+ ASSERT_TRUE(IsPlasmaObjectAlreadySealed(result));
ARROW_CHECK_OK(client_.Release(object_id));
}
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 7e2bc88..f3cff42 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -156,7 +156,7 @@ TEST_F(TestPlasmaSerialization, SealReply) {
ObjectID object_id2;
Status s = ReadSealReply(data.data(), data.size(), &object_id2);
ASSERT_EQ(object_id1, object_id2);
- ASSERT_TRUE(s.IsPlasmaObjectExists());
+ ASSERT_TRUE(IsPlasmaObjectExists(s));
close(fd);
}
@@ -234,7 +234,7 @@ TEST_F(TestPlasmaSerialization, ReleaseReply) {
ObjectID object_id2;
Status s = ReadReleaseReply(data.data(), data.size(), &object_id2);
ASSERT_EQ(object_id1, object_id2);
- ASSERT_TRUE(s.IsPlasmaObjectExists());
+ ASSERT_TRUE(IsPlasmaObjectExists(s));
close(fd);
}
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index bbbd91a..1d508ed 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -122,8 +122,7 @@ from pyarrow.lib import (ArrowException,
ArrowMemoryError,
ArrowNotImplementedError,
ArrowTypeError,
- ArrowSerializationError,
- PlasmaObjectExists)
+ ArrowSerializationError)
# Serialization
from pyarrow.lib import (deserialize_from, deserialize,
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index e352377..7e994c3 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -37,8 +37,10 @@ import warnings
import pyarrow
from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer
+from pyarrow.lib import ArrowException
from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
CFixedSizeBufferWriter, CStatus)
+from pyarrow.includes.libplasma cimport *
from pyarrow import compat
@@ -255,6 +257,34 @@ cdef class PlasmaBuffer(Buffer):
self.client._release(self.object_id)
+class PlasmaObjectNonexistent(ArrowException):
+ pass
+
+
+class PlasmaStoreFull(ArrowException):
+ pass
+
+
+class PlasmaObjectExists(ArrowException):
+ pass
+
+
+cdef int plasma_check_status(const CStatus& status) nogil except -1:
+ if status.ok():
+ return 0
+
+ with gil:
+ message = compat.frombytes(status.message())
+ if IsPlasmaObjectExists(status):
+ raise PlasmaObjectExists(message)
+ elif IsPlasmaObjectNonexistent(status):
+ raise PlasmaObjectNonexistent(message)
+ elif IsPlasmaStoreFull(status):
+ raise PlasmaStoreFull(message)
+
+ return check_status(status)
+
+
cdef class PlasmaClient:
"""
The PlasmaClient is used to interface with a plasma store and manager.
@@ -283,7 +313,7 @@ cdef class PlasmaClient:
for object_id in object_ids:
ids.push_back(object_id.data)
with nogil:
- check_status(self.client.get().Get(ids, timeout_ms, result))
+ plasma_check_status(self.client.get().Get(ids, timeout_ms, result))
# XXX C++ API should instead expose some kind of CreateAuto()
cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data,
@@ -325,9 +355,10 @@ cdef class PlasmaClient:
"""
cdef shared_ptr[CBuffer] data
with nogil:
- check_status(self.client.get().Create(object_id.data, data_size,
- <uint8_t*>(metadata.data()),
- metadata.size(), &data))
+ plasma_check_status(
+ self.client.get().Create(object_id.data, data_size,
+ <uint8_t*>(metadata.data()),
+ metadata.size(), &data))
return self._make_mutable_plasma_buffer(object_id,
data.get().mutable_data(),
data_size)
@@ -358,8 +389,9 @@ cdef class PlasmaClient:
enough objects to create room for it.
"""
with nogil:
- check_status(self.client.get().CreateAndSeal(object_id.data, data,
- metadata))
+ plasma_check_status(
+ self.client.get().CreateAndSeal(object_id.data, data,
+ metadata))
def get_buffers(self, object_ids, timeout_ms=-1, with_meta=False):
"""
@@ -554,7 +586,7 @@ cdef class PlasmaClient:
A string used to identify an object.
"""
with nogil:
- check_status(self.client.get().Seal(object_id.data))
+ plasma_check_status(self.client.get().Seal(object_id.data))
def _release(self, ObjectID object_id):
"""
@@ -566,7 +598,7 @@ cdef class PlasmaClient:
A string used to identify an object.
"""
with nogil:
- check_status(self.client.get().Release(object_id.data))
+ plasma_check_status(self.client.get().Release(object_id.data))
def contains(self, ObjectID object_id):
"""
@@ -579,8 +611,8 @@ cdef class PlasmaClient:
"""
cdef c_bool is_contained
with nogil:
- check_status(self.client.get().Contains(object_id.data,
- &is_contained))
+ plasma_check_status(self.client.get().Contains(object_id.data,
+ &is_contained))
return is_contained
def hash(self, ObjectID object_id):
@@ -600,8 +632,8 @@ cdef class PlasmaClient:
"""
cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize)
with nogil:
- check_status(self.client.get().Hash(object_id.data,
- digest.data()))
+ plasma_check_status(self.client.get().Hash(object_id.data,
+ digest.data()))
return bytes(digest[:])
def evict(self, int64_t num_bytes):
@@ -617,13 +649,15 @@ cdef class PlasmaClient:
"""
cdef int64_t num_bytes_evicted = -1
with nogil:
- check_status(self.client.get().Evict(num_bytes, num_bytes_evicted))
+ plasma_check_status(
+ self.client.get().Evict(num_bytes, num_bytes_evicted))
return num_bytes_evicted
def subscribe(self):
"""Subscribe to notifications about sealed objects."""
with nogil:
- check_status(self.client.get().Subscribe(&self.notification_fd))
+ plasma_check_status(
+ self.client.get().Subscribe(&self.notification_fd))
def get_notification_socket(self):
"""
@@ -650,11 +684,11 @@ cdef class PlasmaClient:
cdef int64_t data_size
cdef int64_t metadata_size
with nogil:
- check_status(self.client.get()
- .DecodeNotification(buf,
- &object_id,
- &data_size,
- &metadata_size))
+ status = self.client.get().DecodeNotification(buf,
+ &object_id,
+ &data_size,
+ &metadata_size)
+ plasma_check_status(status)
return ObjectID(object_id.binary()), data_size, metadata_size
def get_next_notification(self):
@@ -674,11 +708,11 @@ cdef class PlasmaClient:
cdef int64_t data_size
cdef int64_t metadata_size
with nogil:
- check_status(self.client.get()
- .GetNotification(self.notification_fd,
- &object_id.data,
- &data_size,
- &metadata_size))
+ status = self.client.get().GetNotification(self.notification_fd,
+ &object_id.data,
+ &data_size,
+ &metadata_size)
+ plasma_check_status(status)
return object_id, data_size, metadata_size
def to_capsule(self):
@@ -689,7 +723,7 @@ cdef class PlasmaClient:
Disconnect this client from the Plasma store.
"""
with nogil:
- check_status(self.client.get().Disconnect())
+ plasma_check_status(self.client.get().Disconnect())
def delete(self, object_ids):
"""
@@ -705,7 +739,7 @@ cdef class PlasmaClient:
for object_id in object_ids:
ids.push_back(object_id.data)
with nogil:
- check_status(self.client.get().Delete(ids))
+ plasma_check_status(self.client.get().Delete(ids))
def list(self):
"""
@@ -738,7 +772,7 @@ cdef class PlasmaClient:
"""
cdef CObjectTable objects
with nogil:
- check_status(self.client.get().List(&objects))
+ plasma_check_status(self.client.get().List(&objects))
result = dict()
cdef ObjectID object_id
cdef CObjectTableEntry entry
@@ -802,7 +836,7 @@ def connect(store_socket_name, manager_socket_name=None, int release_delay=0,
warnings.warn("release_delay in PlasmaClient.connect is deprecated",
FutureWarning)
with nogil:
- check_status(result.client.get()
- .Connect(result.store_socket_name, b"",
- release_delay, num_retries))
+ plasma_check_status(
+ result.client.get().Connect(result.store_socket_name, b"",
+ release_delay, num_retries))
return result
diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi
index 7b5e8d4..3cb9142 100644
--- a/python/pyarrow/error.pxi
+++ b/python/pyarrow/error.pxi
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from pyarrow.includes.libarrow cimport CStatus
+from pyarrow.includes.libarrow cimport CStatus, IsPyError, RestorePyError
from pyarrow.includes.common cimport c_string
from pyarrow.compat import frombytes
@@ -56,30 +56,21 @@ class ArrowIndexError(IndexError, ArrowException):
pass
-class PlasmaObjectExists(ArrowException):
- pass
-
-
-class PlasmaObjectNonexistent(ArrowException):
- pass
-
-
-class PlasmaStoreFull(ArrowException):
- pass
-
-
class ArrowSerializationError(ArrowException):
pass
+# This function could be written directly in C++ if we didn't
+# define Arrow-specific subclasses (ArrowInvalid etc.)
cdef int check_status(const CStatus& status) nogil except -1:
if status.ok():
return 0
- if status.IsPythonError():
- return -1
-
with gil:
+ if IsPyError(status):
+ RestorePyError(status)
+ return -1
+
message = frombytes(status.message())
if status.IsInvalid():
raise ArrowInvalid(message)
@@ -97,12 +88,6 @@ cdef int check_status(const CStatus& status) nogil except -1:
raise ArrowCapacityError(message)
elif status.IsIndexError():
raise ArrowIndexError(message)
- elif status.IsPlasmaObjectExists():
- raise PlasmaObjectExists(message)
- elif status.IsPlasmaObjectNonexistent():
- raise PlasmaObjectNonexistent(message)
- elif status.IsPlasmaStoreFull():
- raise PlasmaStoreFull(message)
elif status.IsSerializationError():
raise ArrowSerializationError(message)
else:
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index 4a06fc8..8b116f6 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -42,6 +42,7 @@ cdef extern from "numpy/halffloat.h":
cdef extern from "arrow/api.h" namespace "arrow" nogil:
# We can later add more of the common status factory methods as needed
cdef CStatus CStatus_OK "arrow::Status::OK"()
+
cdef CStatus CStatus_Invalid "arrow::Status::Invalid"()
cdef CStatus CStatus_NotImplemented \
"arrow::Status::NotImplemented"(const c_string& msg)
@@ -64,10 +65,6 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
c_bool IsCapacityError()
c_bool IsIndexError()
c_bool IsSerializationError()
- c_bool IsPythonError()
- c_bool IsPlasmaObjectExists()
- c_bool IsPlasmaObjectNonexistent()
- c_bool IsPlasmaStoreFull()
cdef extern from "arrow/result.h" namespace "arrow::internal" nogil:
cdef cppclass CResult[T]:
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 93a7594..89199ca 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1217,6 +1217,8 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
CMemoryPool* pool
c_bool from_pandas
+ # TODO Some functions below are not actually "nogil"
+
CStatus ConvertPySequence(object obj, object mask,
const PyConversionOptions& options,
shared_ptr[CChunkedArray]* out)
@@ -1342,6 +1344,11 @@ cdef extern from 'arrow/python/init.h':
int arrow_init_numpy() except -1
+cdef extern from 'arrow/python/common.h' namespace "arrow::py":
+ c_bool IsPyError(const CStatus& status)
+ void RestorePyError(const CStatus& status)
+
+
cdef extern from 'arrow/python/pyarrow.h' namespace 'arrow::py':
int import_pyarrow() except -1
diff --git a/python/pyarrow/includes/libplasma.pxd b/python/pyarrow/includes/libplasma.pxd
new file mode 100644
index 0000000..1b84ab4
--- /dev/null
+++ b/python/pyarrow/includes/libplasma.pxd
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from pyarrow.includes.common cimport *
+
+cdef extern from "plasma/common.h" namespace "plasma" nogil:
+ cdef c_bool IsPlasmaObjectExists(const CStatus& status)
+ cdef c_bool IsPlasmaObjectNonexistent(const CStatus& status)
+ cdef c_bool IsPlasmaStoreFull(const CStatus& status)
diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py
index 748de97..43ca471 100644
--- a/python/pyarrow/plasma.py
+++ b/python/pyarrow/plasma.py
@@ -27,7 +27,9 @@ import tempfile
import time
from pyarrow._plasma import (ObjectID, ObjectNotAvailable, # noqa
- PlasmaBuffer, PlasmaClient, connect)
+ PlasmaBuffer, PlasmaClient, connect,
+ PlasmaObjectExists, PlasmaObjectNonexistent,
+ PlasmaStoreFull)
# The Plasma TensorFlow Operator needs to be compiled on the end user's
diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py
index 9d66d96..f961c00 100644
--- a/python/pyarrow/tests/test_array.py
+++ b/python/pyarrow/tests/test_array.py
@@ -1479,7 +1479,7 @@ def test_array_masked():
def test_array_from_large_pyints():
# ARROW-5430
- with pytest.raises(pa.ArrowInvalid):
+ with pytest.raises(OverflowError):
# too large for int64 so dtype must be explicitly provided
pa.array([int(2 ** 63)])
diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py
index 4e04083..81d5952 100644
--- a/python/pyarrow/tests/test_convert_builtin.py
+++ b/python/pyarrow/tests/test_convert_builtin.py
@@ -26,9 +26,12 @@ import collections
import datetime
import decimal
import itertools
+import traceback
+import sys
+
import numpy as np
-import six
import pytz
+import six
int_type_pairs = [
@@ -53,6 +56,19 @@ class StrangeIterable:
return self.lst.__iter__()
+class MyInt:
+ def __init__(self, value):
+ self.value = value
+
+ def __int__(self):
+ return self.value
+
+
+class MyBrokenInt:
+ def __int__(self):
+ 1/0 # MARKER
+
+
def check_struct_type(ty, expected):
"""
Check a struct type is as expected, but not taking order into account.
@@ -191,7 +207,7 @@ def test_nested_lists(seq):
@parametrize_with_iterable_types
def test_list_with_non_list(seq):
# List types don't accept non-sequences
- with pytest.raises(pa.ArrowTypeError):
+ with pytest.raises(TypeError):
pa.array(seq([[], [1, 2], 3]), type=pa.list_(pa.int64()))
@@ -299,6 +315,24 @@ def test_sequence_numpy_integer_inferred(seq, np_scalar_pa_type):
assert arr.to_pylist() == expected
+@parametrize_with_iterable_types
+def test_sequence_custom_integers(seq):
+ expected = [0, 42, 2**33 + 1, -2**63]
+ data = list(map(MyInt, expected))
+ arr = pa.array(seq(data), type=pa.int64())
+ assert arr.to_pylist() == expected
+
+
+@parametrize_with_iterable_types
+def test_broken_integers(seq):
+ data = [MyBrokenInt()]
+ with pytest.raises(ZeroDivisionError) as exc_info:
+ pa.array(seq(data), type=pa.int64())
+ # Original traceback is kept
+ tb_lines = traceback.format_tb(exc_info.tb)
+ assert "# MARKER" in tb_lines[-1]
+
+
def test_numpy_scalars_mixed_type():
# ARROW-4324
data = [np.int32(10), np.float32(0.5)]
@@ -308,7 +342,7 @@ def test_numpy_scalars_mixed_type():
@pytest.mark.xfail(reason="Type inference for uint64 not implemented",
- raises=pa.ArrowException)
+ raises=OverflowError)
def test_uint64_max_convert():
data = [0, np.iinfo(np.uint64).max]
@@ -323,20 +357,20 @@ def test_uint64_max_convert():
@pytest.mark.parametrize("bits", [8, 16, 32, 64])
def test_signed_integer_overflow(bits):
ty = getattr(pa, "int%d" % bits)()
- # XXX ideally would raise OverflowError
- with pytest.raises((ValueError, pa.ArrowException)):
+ # XXX ideally would always raise OverflowError
+ with pytest.raises((OverflowError, pa.ArrowInvalid)):
pa.array([2 ** (bits - 1)], ty)
- with pytest.raises((ValueError, pa.ArrowException)):
+ with pytest.raises((OverflowError, pa.ArrowInvalid)):
pa.array([-2 ** (bits - 1) - 1], ty)
@pytest.mark.parametrize("bits", [8, 16, 32, 64])
def test_unsigned_integer_overflow(bits):
ty = getattr(pa, "uint%d" % bits)()
- # XXX ideally would raise OverflowError
- with pytest.raises((ValueError, pa.ArrowException)):
+ # XXX ideally would always raise OverflowError
+ with pytest.raises((OverflowError, pa.ArrowInvalid)):
pa.array([2 ** bits], ty)
- with pytest.raises((ValueError, pa.ArrowException)):
+ with pytest.raises((OverflowError, pa.ArrowInvalid)):
pa.array([-1], ty)
@@ -661,7 +695,7 @@ def test_sequence_explicit_types(input):
def test_date32_overflow():
# Overflow
data3 = [2**32, None]
- with pytest.raises(pa.ArrowException):
+ with pytest.raises((OverflowError, pa.ArrowException)):
pa.array(data3, type=pa.date32())
@@ -831,12 +865,19 @@ def test_sequence_timestamp_from_int_with_unit():
assert repr(arr_ns[0]) == "Timestamp('1970-01-01 00:00:00.000000001')"
assert str(arr_ns[0]) == "1970-01-01 00:00:00.000000001"
- with pytest.raises(pa.ArrowException):
- class CustomClass():
- pass
- pa.array([1, CustomClass()], type=ns)
- pa.array([1, CustomClass()], type=pa.date32())
- pa.array([1, CustomClass()], type=pa.date64())
+ if sys.version_info >= (3,):
+ expected_exc = TypeError
+ else:
+ # Can have "AttributeError: CustomClass instance
+ # has no attribute '__trunc__'"
+ expected_exc = (TypeError, AttributeError)
+
+ class CustomClass():
+ pass
+
+ for ty in [ns, pa.date32(), pa.date64()]:
+ with pytest.raises(expected_exc):
+ pa.array([1, CustomClass()], type=ty)
def test_sequence_nesting_levels():
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 149bdd5..49808a1 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -227,7 +227,7 @@ class TestPlasmaClient(object):
# Make sure that creating the same object twice raises an exception.
object_id = random_object_id()
self.plasma_client.create_and_seal(object_id, b'a', b'b')
- with pytest.raises(pa.PlasmaObjectExists):
+ with pytest.raises(pa.plasma.PlasmaObjectExists):
self.plasma_client.create_and_seal(object_id, b'a', b'b')
# Make sure that these objects can be evicted.
@@ -852,7 +852,7 @@ class TestPlasmaClient(object):
for _ in range(2):
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
# Verify that an object that is too large does not fit.
- with pytest.raises(pa.lib.PlasmaStoreFull):
+ with pytest.raises(pa.plasma.PlasmaStoreFull):
create_object(self.plasma_client2,
DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)