You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/01/30 15:31:57 UTC
[arrow] 01/01: ARROW-2033: [Python] Fix pa.array() with iterator
input
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 40dd9cc25a46aa56a5d852fbc8ebdbc55b5fe8d6
Author: Antoine Pitrou <pi...@free.fr>
AuthorDate: Tue Jan 30 10:16:51 2018 -0500
ARROW-2033: [Python] Fix pa.array() with iterator input
Iterator (not iterable) input was broken with pa.array() unless both type and size were explicitly passed.
Author: Wes McKinney <we...@twosigma.com>
Author: Antoine Pitrou <pi...@free.fr>
Closes #1513 from pitrou/ARROW-2033-pa-array-iterator and squashes the following commits:
0013889a [Wes McKinney] Code review comments
dc95be29 [Antoine Pitrou] Fix pyarrow.array with iterator input
Change-Id: I930c3309e3fde12e65ede066b47985f67f7f4037
---
cpp/src/arrow/python/builtin_convert.cc | 174 ++++++++++++++-------------
cpp/src/arrow/python/builtin_convert.h | 18 ++-
python/pyarrow/array.pxi | 15 ++-
python/pyarrow/includes/libarrow.pxd | 13 +-
python/pyarrow/tests/test_convert_builtin.py | 19 +++
5 files changed, 141 insertions(+), 98 deletions(-)
diff --git a/cpp/src/arrow/python/builtin_convert.cc b/cpp/src/arrow/python/builtin_convert.cc
index f7a370c..b41c55d 100644
--- a/cpp/src/arrow/python/builtin_convert.cc
+++ b/cpp/src/arrow/python/builtin_convert.cc
@@ -172,38 +172,26 @@ class SeqVisitor {
Status Visit(PyObject* obj, int level = 0) {
max_nesting_level_ = std::max(max_nesting_level_, level);
- // Loop through either a sequence or an iterator.
- if (PySequence_Check(obj)) {
- Py_ssize_t size = PySequence_Size(obj);
- for (int64_t i = 0; i < size; ++i) {
- OwnedRef ref;
- if (PyArray_Check(obj)) {
- auto array = reinterpret_cast<PyArrayObject*>(obj);
- auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i));
-
- ref.reset(PyArray_GETITEM(array, ptr));
- RETURN_IF_PYERROR();
+ // Loop through a sequence
+ if (!PySequence_Check(obj))
+ return Status::TypeError("Object is not a sequence or iterable");
- RETURN_NOT_OK(VisitElem(ref, level));
- } else {
- ref.reset(PySequence_GetItem(obj, i));
- RETURN_IF_PYERROR();
- RETURN_NOT_OK(VisitElem(ref, level));
- }
- }
- } else if (PyObject_HasAttrString(obj, "__iter__")) {
- OwnedRef iter(PyObject_GetIter(obj));
- RETURN_IF_PYERROR();
+ Py_ssize_t size = PySequence_Size(obj);
+ for (int64_t i = 0; i < size; ++i) {
+ OwnedRef ref;
+ if (PyArray_Check(obj)) {
+ auto array = reinterpret_cast<PyArrayObject*>(obj);
+ auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i));
- PyObject* item = NULLPTR;
- while ((item = PyIter_Next(iter.obj()))) {
+ ref.reset(PyArray_GETITEM(array, ptr));
RETURN_IF_PYERROR();
- OwnedRef ref(item);
+ RETURN_NOT_OK(VisitElem(ref, level));
+ } else {
+ ref.reset(PySequence_GetItem(obj, i));
+ RETURN_IF_PYERROR();
RETURN_NOT_OK(VisitElem(ref, level));
}
- } else {
- return Status::TypeError("Object is not a sequence or iterable");
}
return Status::OK();
}
@@ -285,25 +273,45 @@ class SeqVisitor {
}
};
-Status InferArrowSize(PyObject* obj, int64_t* size) {
+// Convert *obj* to a sequence if necessary
+// Fill *size* to its length. If >= 0 on entry, *size* is an upper size
+// bound that may lead to truncation.
+Status ConvertToSequenceAndInferSize(PyObject* obj, PyObject** seq, int64_t* size) {
if (PySequence_Check(obj)) {
- *size = static_cast<int64_t>(PySequence_Size(obj));
- } else if (PyObject_HasAttrString(obj, "__iter__")) {
+ // obj is already a sequence
+ int64_t real_size = static_cast<int64_t>(PySequence_Size(obj));
+ if (*size < 0) {
+ *size = real_size;
+ } else {
+ *size = std::min(real_size, *size);
+ }
+ Py_INCREF(obj);
+ *seq = obj;
+ } else if (*size < 0) {
+ // unknown size, exhaust iterator
+ *seq = PySequence_List(obj);
+ RETURN_IF_PYERROR();
+ *size = static_cast<int64_t>(PyList_GET_SIZE(*seq));
+ } else {
+ // size is known but iterator could be infinite
+ Py_ssize_t i, n = *size;
PyObject* iter = PyObject_GetIter(obj);
+ RETURN_IF_PYERROR();
OwnedRef iter_ref(iter);
- *size = 0;
- PyObject* item;
- while ((item = PyIter_Next(iter))) {
- OwnedRef item_ref(item);
- *size += 1;
+ PyObject* lst = PyList_New(n);
+ RETURN_IF_PYERROR();
+ for (i = 0; i < n; i++) {
+ PyObject* item = PyIter_Next(iter);
+ if (!item) break;
+ PyList_SET_ITEM(lst, i, item);
}
- } else {
- return Status::TypeError("Object is not a sequence or iterable");
- }
- if (PyErr_Occurred()) {
- // Not a sequence
- PyErr_Clear();
- return Status::TypeError("Object is not a sequence or iterable");
+ // Shrink list if len(iterator) < size
+ if (i < n && PyList_SetSlice(lst, i, n, NULL)) {
+ Py_DECREF(lst);
+ return Status::UnknownError("failed to resize list");
+ }
+ *seq = lst;
+ *size = std::min<int64_t>(i, *size);
}
return Status::OK();
}
@@ -325,7 +333,10 @@ Status InferArrowType(PyObject* obj, std::shared_ptr<DataType>* out_type) {
Status InferArrowTypeAndSize(PyObject* obj, int64_t* size,
std::shared_ptr<DataType>* out_type) {
- RETURN_NOT_OK(InferArrowSize(obj, size));
+ if (!PySequence_Check(obj)) {
+ return Status::TypeError("Object is not a sequence");
+ }
+ *size = static_cast<int64_t>(PySequence_Size(obj));
// For 0-length sequences, refuse to guess
if (*size == 0) {
@@ -382,27 +393,8 @@ class TypedConverterVisitor : public TypedConverter<BuilderType> {
RETURN_NOT_OK(static_cast<Derived*>(this)->AppendItem(ref));
}
}
- } else if (PyObject_HasAttrString(obj, "__iter__")) {
- PyObject* iter = PyObject_GetIter(obj);
- OwnedRef iter_ref(iter);
- PyObject* item;
- int64_t i = 0;
- // To allow people with long generators to only convert a subset, stop
- // consuming at size.
- while ((item = PyIter_Next(iter)) && i < size) {
- OwnedRef ref(item);
- if (ref.obj() == Py_None) {
- RETURN_NOT_OK(this->typed_builder_->AppendNull());
- } else {
- RETURN_NOT_OK(static_cast<Derived*>(this)->AppendItem(ref));
- }
- ++i;
- }
- if (size != i) {
- RETURN_NOT_OK(this->typed_builder_->Resize(i));
- }
} else {
- return Status::TypeError("Object is not a sequence or iterable");
+ return Status::TypeError("Object is not a sequence");
}
return Status::OK();
}
@@ -830,38 +822,56 @@ Status AppendPySequence(PyObject* obj, int64_t size,
return converter->AppendData(obj, size);
}
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out) {
+static Status ConvertPySequenceReal(PyObject* obj, int64_t size,
+ const std::shared_ptr<DataType>* type,
+ MemoryPool* pool, std::shared_ptr<Array>* out) {
PyAcquireGIL lock;
- std::shared_ptr<DataType> type;
- int64_t size;
- RETURN_NOT_OK(InferArrowTypeAndSize(obj, &size, &type));
- return ConvertPySequence(obj, pool, out, type, size);
-}
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
- const std::shared_ptr<DataType>& type, int64_t size) {
- PyAcquireGIL lock;
+ PyObject* seq;
+ ScopedRef tmp_seq_nanny;
+
+ std::shared_ptr<DataType> real_type;
+
+ RETURN_NOT_OK(ConvertToSequenceAndInferSize(obj, &seq, &size));
+ tmp_seq_nanny.reset(seq);
+ if (type == nullptr) {
+ RETURN_NOT_OK(InferArrowType(seq, &real_type));
+ } else {
+ real_type = *type;
+ }
+ DCHECK_GE(size, 0);
+
// Handle NA / NullType case
- if (type->id() == Type::NA) {
+ if (real_type->id() == Type::NA) {
out->reset(new NullArray(size));
return Status::OK();
}
// Give the sequence converter an array builder
std::unique_ptr<ArrayBuilder> builder;
- RETURN_NOT_OK(MakeBuilder(pool, type, &builder));
- RETURN_NOT_OK(AppendPySequence(obj, size, type, builder.get()));
+ RETURN_NOT_OK(MakeBuilder(pool, real_type, &builder));
+ RETURN_NOT_OK(AppendPySequence(seq, size, real_type, builder.get()));
return builder->Finish(out);
}
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
- const std::shared_ptr<DataType>& type) {
- int64_t size;
- {
- PyAcquireGIL lock;
- RETURN_NOT_OK(InferArrowSize(obj, &size));
- }
- return ConvertPySequence(obj, pool, out, type, size);
+Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out) {
+ return ConvertPySequenceReal(obj, -1, nullptr, pool, out);
+}
+
+Status ConvertPySequence(PyObject* obj, const std::shared_ptr<DataType>& type,
+ MemoryPool* pool, std::shared_ptr<Array>* out) {
+ return ConvertPySequenceReal(obj, -1, &type, pool, out);
+}
+
+Status ConvertPySequence(PyObject* obj, int64_t size, MemoryPool* pool,
+ std::shared_ptr<Array>* out) {
+ return ConvertPySequenceReal(obj, size, nullptr, pool, out);
+}
+
+Status ConvertPySequence(PyObject* obj, int64_t size,
+ const std::shared_ptr<DataType>& type, MemoryPool* pool,
+ std::shared_ptr<Array>* out) {
+ return ConvertPySequenceReal(obj, size, &type, pool, out);
}
Status CheckPythonBytesAreFixedLength(PyObject* obj, Py_ssize_t expected_length) {
diff --git a/cpp/src/arrow/python/builtin_convert.h b/cpp/src/arrow/python/builtin_convert.h
index cde7a1b..4bd3f08 100644
--- a/cpp/src/arrow/python/builtin_convert.h
+++ b/cpp/src/arrow/python/builtin_convert.h
@@ -39,11 +39,11 @@ class Status;
namespace py {
+// These three functions take a sequence input, not arbitrary iterables
ARROW_EXPORT arrow::Status InferArrowType(PyObject* obj,
std::shared_ptr<arrow::DataType>* out_type);
ARROW_EXPORT arrow::Status InferArrowTypeAndSize(
PyObject* obj, int64_t* size, std::shared_ptr<arrow::DataType>* out_type);
-ARROW_EXPORT arrow::Status InferArrowSize(PyObject* obj, int64_t* size);
ARROW_EXPORT arrow::Status AppendPySequence(PyObject* obj, int64_t size,
const std::shared_ptr<arrow::DataType>& type,
@@ -53,15 +53,21 @@ ARROW_EXPORT arrow::Status AppendPySequence(PyObject* obj, int64_t size,
ARROW_EXPORT
Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out);
-// Size inference
+// Type inference only
ARROW_EXPORT
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
- const std::shared_ptr<DataType>& type);
+Status ConvertPySequence(PyObject* obj, int64_t size, MemoryPool* pool,
+ std::shared_ptr<Array>* out);
+
+// Size inference only
+ARROW_EXPORT
+Status ConvertPySequence(PyObject* obj, const std::shared_ptr<DataType>& type,
+ MemoryPool* pool, std::shared_ptr<Array>* out);
// No inference
ARROW_EXPORT
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
- const std::shared_ptr<DataType>& type, int64_t size);
+Status ConvertPySequence(PyObject* obj, int64_t size,
+ const std::shared_ptr<DataType>& type, MemoryPool* pool,
+ std::shared_ptr<Array>* out);
ARROW_EXPORT
Status InvalidConversion(PyObject* obj, const std::string& expected_type_name,
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index cca9425..caeefd2 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -21,14 +21,21 @@ cdef _sequence_to_array(object sequence, object size, DataType type,
cdef shared_ptr[CArray] out
cdef int64_t c_size
if type is None:
- with nogil:
- check_status(ConvertPySequence(sequence, pool, &out))
+ if size is None:
+ with nogil:
+ check_status(ConvertPySequence(sequence, pool, &out))
+ else:
+ c_size = size
+ with nogil:
+ check_status(
+ ConvertPySequence(sequence, c_size, pool, &out)
+ )
else:
if size is None:
with nogil:
check_status(
ConvertPySequence(
- sequence, pool, &out, type.sp_type
+ sequence, type.sp_type, pool, &out,
)
)
else:
@@ -36,7 +43,7 @@ cdef _sequence_to_array(object sequence, object size, DataType type,
with nogil:
check_status(
ConvertPySequence(
- sequence, pool, &out, type.sp_type, c_size
+ sequence, c_size, type.sp_type, pool, &out,
)
)
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 91bc96d..2e83f07 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -852,13 +852,14 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
shared_ptr[CDataType] GetTimestampType(TimeUnit unit)
CStatus ConvertPySequence(object obj, CMemoryPool* pool,
shared_ptr[CArray]* out)
- CStatus ConvertPySequence(object obj, CMemoryPool* pool,
- shared_ptr[CArray]* out,
- const shared_ptr[CDataType]& type)
- CStatus ConvertPySequence(object obj, CMemoryPool* pool,
- shared_ptr[CArray]* out,
+ CStatus ConvertPySequence(object obj, const shared_ptr[CDataType]& type,
+ CMemoryPool* pool, shared_ptr[CArray]* out)
+ CStatus ConvertPySequence(object obj, int64_t size, CMemoryPool* pool,
+ shared_ptr[CArray]* out)
+ CStatus ConvertPySequence(object obj, int64_t size,
const shared_ptr[CDataType]& type,
- int64_t size)
+ CMemoryPool* pool,
+ shared_ptr[CArray]* out)
CStatus NumPyDtypeToArrow(object dtype, shared_ptr[CDataType]* type)
diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py
index fa603b1..2b317df 100644
--- a/python/pyarrow/tests/test_convert_builtin.py
+++ b/python/pyarrow/tests/test_convert_builtin.py
@@ -23,6 +23,7 @@ import pyarrow as pa
import datetime
import decimal
+import itertools
import numpy as np
import six
@@ -68,6 +69,24 @@ def test_limited_iterator_size_underflow():
assert arr1.equals(arr2)
+def test_iterator_without_size():
+ expected = pa.array((0, 1, 2))
+ arr1 = pa.array(iter(range(3)))
+ assert arr1.equals(expected)
+ # Same with explicit type
+ arr1 = pa.array(iter(range(3)), type=pa.int64())
+ assert arr1.equals(expected)
+
+
+def test_infinite_iterator():
+ expected = pa.array((0, 1, 2))
+ arr1 = pa.array(itertools.count(0), size=3)
+ assert arr1.equals(expected)
+ # Same with explicit type
+ arr1 = pa.array(itertools.count(0), type=pa.int64(), size=3)
+ assert arr1.equals(expected)
+
+
def _as_list(xs):
return xs
--
To stop receiving notification emails like this one, please contact
wesm@apache.org.