You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/01/30 15:16:55 UTC

[arrow] branch master updated: ARROW-2033: [Python] Fix pa.array() with iterator input

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 715cf4b  ARROW-2033: [Python] Fix pa.array() with iterator input
715cf4b is described below

commit 715cf4b8497b81fbf4f452d0bec2e27afacdbf6e
Author: Wes McKinney <we...@twosigma.com>
AuthorDate: Tue Jan 30 10:16:51 2018 -0500

    ARROW-2033: [Python] Fix pa.array() with iterator input
    
    Iterator (not iterable) input was broken with pa.array() unless both type and size were explicitly passed.
    
    Author: Wes McKinney <we...@twosigma.com>
    Author: Antoine Pitrou <pi...@free.fr>
    
    Closes #1513 from pitrou/ARROW-2033-pa-array-iterator and squashes the following commits:
    
    0013889a [Wes McKinney] Code review comments
    dc95be29 [Antoine Pitrou] Fix pyarrow.array with iterator input
---
 cpp/src/arrow/python/builtin_convert.cc      | 174 ++++++++++++++-------------
 cpp/src/arrow/python/builtin_convert.h       |  18 ++-
 python/pyarrow/array.pxi                     |  15 ++-
 python/pyarrow/includes/libarrow.pxd         |  13 +-
 python/pyarrow/tests/test_convert_builtin.py |  19 +++
 5 files changed, 141 insertions(+), 98 deletions(-)

diff --git a/cpp/src/arrow/python/builtin_convert.cc b/cpp/src/arrow/python/builtin_convert.cc
index f7a370c..b41c55d 100644
--- a/cpp/src/arrow/python/builtin_convert.cc
+++ b/cpp/src/arrow/python/builtin_convert.cc
@@ -172,38 +172,26 @@ class SeqVisitor {
   Status Visit(PyObject* obj, int level = 0) {
     max_nesting_level_ = std::max(max_nesting_level_, level);
 
-    // Loop through either a sequence or an iterator.
-    if (PySequence_Check(obj)) {
-      Py_ssize_t size = PySequence_Size(obj);
-      for (int64_t i = 0; i < size; ++i) {
-        OwnedRef ref;
-        if (PyArray_Check(obj)) {
-          auto array = reinterpret_cast<PyArrayObject*>(obj);
-          auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i));
-
-          ref.reset(PyArray_GETITEM(array, ptr));
-          RETURN_IF_PYERROR();
+    // Loop through a sequence
+    if (!PySequence_Check(obj))
+      return Status::TypeError("Object is not a sequence or iterable");
 
-          RETURN_NOT_OK(VisitElem(ref, level));
-        } else {
-          ref.reset(PySequence_GetItem(obj, i));
-          RETURN_IF_PYERROR();
-          RETURN_NOT_OK(VisitElem(ref, level));
-        }
-      }
-    } else if (PyObject_HasAttrString(obj, "__iter__")) {
-      OwnedRef iter(PyObject_GetIter(obj));
-      RETURN_IF_PYERROR();
+    Py_ssize_t size = PySequence_Size(obj);
+    for (int64_t i = 0; i < size; ++i) {
+      OwnedRef ref;
+      if (PyArray_Check(obj)) {
+        auto array = reinterpret_cast<PyArrayObject*>(obj);
+        auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i));
 
-      PyObject* item = NULLPTR;
-      while ((item = PyIter_Next(iter.obj()))) {
+        ref.reset(PyArray_GETITEM(array, ptr));
         RETURN_IF_PYERROR();
 
-        OwnedRef ref(item);
+        RETURN_NOT_OK(VisitElem(ref, level));
+      } else {
+        ref.reset(PySequence_GetItem(obj, i));
+        RETURN_IF_PYERROR();
         RETURN_NOT_OK(VisitElem(ref, level));
       }
-    } else {
-      return Status::TypeError("Object is not a sequence or iterable");
     }
     return Status::OK();
   }
@@ -285,25 +273,45 @@ class SeqVisitor {
   }
 };
 
-Status InferArrowSize(PyObject* obj, int64_t* size) {
+// Convert *obj* to a sequence if necessary
+// Fill *size* to its length.  If >= 0 on entry, *size* is an upper size
+// bound that may lead to truncation.
+Status ConvertToSequenceAndInferSize(PyObject* obj, PyObject** seq, int64_t* size) {
   if (PySequence_Check(obj)) {
-    *size = static_cast<int64_t>(PySequence_Size(obj));
-  } else if (PyObject_HasAttrString(obj, "__iter__")) {
+    // obj is already a sequence
+    int64_t real_size = static_cast<int64_t>(PySequence_Size(obj));
+    if (*size < 0) {
+      *size = real_size;
+    } else {
+      *size = std::min(real_size, *size);
+    }
+    Py_INCREF(obj);
+    *seq = obj;
+  } else if (*size < 0) {
+    // unknown size, exhaust iterator
+    *seq = PySequence_List(obj);
+    RETURN_IF_PYERROR();
+    *size = static_cast<int64_t>(PyList_GET_SIZE(*seq));
+  } else {
+    // size is known but iterator could be infinite
+    Py_ssize_t i, n = *size;
     PyObject* iter = PyObject_GetIter(obj);
+    RETURN_IF_PYERROR();
     OwnedRef iter_ref(iter);
-    *size = 0;
-    PyObject* item;
-    while ((item = PyIter_Next(iter))) {
-      OwnedRef item_ref(item);
-      *size += 1;
+    PyObject* lst = PyList_New(n);
+    RETURN_IF_PYERROR();
+    for (i = 0; i < n; i++) {
+      PyObject* item = PyIter_Next(iter);
+      if (!item) break;
+      PyList_SET_ITEM(lst, i, item);
     }
-  } else {
-    return Status::TypeError("Object is not a sequence or iterable");
-  }
-  if (PyErr_Occurred()) {
-    // Not a sequence
-    PyErr_Clear();
-    return Status::TypeError("Object is not a sequence or iterable");
+    // Shrink list if len(iterator) < size
+    if (i < n && PyList_SetSlice(lst, i, n, NULL)) {
+      Py_DECREF(lst);
+      return Status::UnknownError("failed to resize list");
+    }
+    *seq = lst;
+    *size = std::min<int64_t>(i, *size);
   }
   return Status::OK();
 }
@@ -325,7 +333,10 @@ Status InferArrowType(PyObject* obj, std::shared_ptr<DataType>* out_type) {
 
 Status InferArrowTypeAndSize(PyObject* obj, int64_t* size,
                              std::shared_ptr<DataType>* out_type) {
-  RETURN_NOT_OK(InferArrowSize(obj, size));
+  if (!PySequence_Check(obj)) {
+    return Status::TypeError("Object is not a sequence");
+  }
+  *size = static_cast<int64_t>(PySequence_Size(obj));
 
   // For 0-length sequences, refuse to guess
   if (*size == 0) {
@@ -382,27 +393,8 @@ class TypedConverterVisitor : public TypedConverter<BuilderType> {
           RETURN_NOT_OK(static_cast<Derived*>(this)->AppendItem(ref));
         }
       }
-    } else if (PyObject_HasAttrString(obj, "__iter__")) {
-      PyObject* iter = PyObject_GetIter(obj);
-      OwnedRef iter_ref(iter);
-      PyObject* item;
-      int64_t i = 0;
-      // To allow people with long generators to only convert a subset, stop
-      // consuming at size.
-      while ((item = PyIter_Next(iter)) && i < size) {
-        OwnedRef ref(item);
-        if (ref.obj() == Py_None) {
-          RETURN_NOT_OK(this->typed_builder_->AppendNull());
-        } else {
-          RETURN_NOT_OK(static_cast<Derived*>(this)->AppendItem(ref));
-        }
-        ++i;
-      }
-      if (size != i) {
-        RETURN_NOT_OK(this->typed_builder_->Resize(i));
-      }
     } else {
-      return Status::TypeError("Object is not a sequence or iterable");
+      return Status::TypeError("Object is not a sequence");
     }
     return Status::OK();
   }
@@ -830,38 +822,56 @@ Status AppendPySequence(PyObject* obj, int64_t size,
   return converter->AppendData(obj, size);
 }
 
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out) {
+static Status ConvertPySequenceReal(PyObject* obj, int64_t size,
+                                    const std::shared_ptr<DataType>* type,
+                                    MemoryPool* pool, std::shared_ptr<Array>* out) {
   PyAcquireGIL lock;
-  std::shared_ptr<DataType> type;
-  int64_t size;
-  RETURN_NOT_OK(InferArrowTypeAndSize(obj, &size, &type));
-  return ConvertPySequence(obj, pool, out, type, size);
-}
 
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
-                         const std::shared_ptr<DataType>& type, int64_t size) {
-  PyAcquireGIL lock;
+  PyObject* seq;
+  ScopedRef tmp_seq_nanny;
+
+  std::shared_ptr<DataType> real_type;
+
+  RETURN_NOT_OK(ConvertToSequenceAndInferSize(obj, &seq, &size));
+  tmp_seq_nanny.reset(seq);
+  if (type == nullptr) {
+    RETURN_NOT_OK(InferArrowType(seq, &real_type));
+  } else {
+    real_type = *type;
+  }
+  DCHECK_GE(size, 0);
+
   // Handle NA / NullType case
-  if (type->id() == Type::NA) {
+  if (real_type->id() == Type::NA) {
     out->reset(new NullArray(size));
     return Status::OK();
   }
 
   // Give the sequence converter an array builder
   std::unique_ptr<ArrayBuilder> builder;
-  RETURN_NOT_OK(MakeBuilder(pool, type, &builder));
-  RETURN_NOT_OK(AppendPySequence(obj, size, type, builder.get()));
+  RETURN_NOT_OK(MakeBuilder(pool, real_type, &builder));
+  RETURN_NOT_OK(AppendPySequence(seq, size, real_type, builder.get()));
   return builder->Finish(out);
 }
 
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
-                         const std::shared_ptr<DataType>& type) {
-  int64_t size;
-  {
-    PyAcquireGIL lock;
-    RETURN_NOT_OK(InferArrowSize(obj, &size));
-  }
-  return ConvertPySequence(obj, pool, out, type, size);
+Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out) {
+  return ConvertPySequenceReal(obj, -1, nullptr, pool, out);
+}
+
+Status ConvertPySequence(PyObject* obj, const std::shared_ptr<DataType>& type,
+                         MemoryPool* pool, std::shared_ptr<Array>* out) {
+  return ConvertPySequenceReal(obj, -1, &type, pool, out);
+}
+
+Status ConvertPySequence(PyObject* obj, int64_t size, MemoryPool* pool,
+                         std::shared_ptr<Array>* out) {
+  return ConvertPySequenceReal(obj, size, nullptr, pool, out);
+}
+
+Status ConvertPySequence(PyObject* obj, int64_t size,
+                         const std::shared_ptr<DataType>& type, MemoryPool* pool,
+                         std::shared_ptr<Array>* out) {
+  return ConvertPySequenceReal(obj, size, &type, pool, out);
 }
 
 Status CheckPythonBytesAreFixedLength(PyObject* obj, Py_ssize_t expected_length) {
diff --git a/cpp/src/arrow/python/builtin_convert.h b/cpp/src/arrow/python/builtin_convert.h
index cde7a1b..4bd3f08 100644
--- a/cpp/src/arrow/python/builtin_convert.h
+++ b/cpp/src/arrow/python/builtin_convert.h
@@ -39,11 +39,11 @@ class Status;
 
 namespace py {
 
+// These three functions take a sequence input, not arbitrary iterables
 ARROW_EXPORT arrow::Status InferArrowType(PyObject* obj,
                                           std::shared_ptr<arrow::DataType>* out_type);
 ARROW_EXPORT arrow::Status InferArrowTypeAndSize(
     PyObject* obj, int64_t* size, std::shared_ptr<arrow::DataType>* out_type);
-ARROW_EXPORT arrow::Status InferArrowSize(PyObject* obj, int64_t* size);
 
 ARROW_EXPORT arrow::Status AppendPySequence(PyObject* obj, int64_t size,
                                             const std::shared_ptr<arrow::DataType>& type,
@@ -53,15 +53,21 @@ ARROW_EXPORT arrow::Status AppendPySequence(PyObject* obj, int64_t size,
 ARROW_EXPORT
 Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out);
 
-// Size inference
+// Type inference only
 ARROW_EXPORT
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
-                         const std::shared_ptr<DataType>& type);
+Status ConvertPySequence(PyObject* obj, int64_t size, MemoryPool* pool,
+                         std::shared_ptr<Array>* out);
+
+// Size inference only
+ARROW_EXPORT
+Status ConvertPySequence(PyObject* obj, const std::shared_ptr<DataType>& type,
+                         MemoryPool* pool, std::shared_ptr<Array>* out);
 
 // No inference
 ARROW_EXPORT
-Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
-                         const std::shared_ptr<DataType>& type, int64_t size);
+Status ConvertPySequence(PyObject* obj, int64_t size,
+                         const std::shared_ptr<DataType>& type, MemoryPool* pool,
+                         std::shared_ptr<Array>* out);
 
 ARROW_EXPORT
 Status InvalidConversion(PyObject* obj, const std::string& expected_type_name,
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index cca9425..caeefd2 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -21,14 +21,21 @@ cdef _sequence_to_array(object sequence, object size, DataType type,
     cdef shared_ptr[CArray] out
     cdef int64_t c_size
     if type is None:
-        with nogil:
-            check_status(ConvertPySequence(sequence, pool, &out))
+        if size is None:
+            with nogil:
+                check_status(ConvertPySequence(sequence, pool, &out))
+        else:
+            c_size = size
+            with nogil:
+                check_status(
+                    ConvertPySequence(sequence, c_size, pool, &out)
+                )
     else:
         if size is None:
             with nogil:
                 check_status(
                     ConvertPySequence(
-                        sequence, pool, &out, type.sp_type
+                        sequence, type.sp_type, pool, &out,
                     )
                 )
         else:
@@ -36,7 +43,7 @@ cdef _sequence_to_array(object sequence, object size, DataType type,
             with nogil:
                 check_status(
                     ConvertPySequence(
-                        sequence, pool, &out, type.sp_type, c_size
+                        sequence, c_size, type.sp_type, pool, &out,
                     )
                 )
 
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 91bc96d..2e83f07 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -852,13 +852,14 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
     shared_ptr[CDataType] GetTimestampType(TimeUnit unit)
     CStatus ConvertPySequence(object obj, CMemoryPool* pool,
                               shared_ptr[CArray]* out)
-    CStatus ConvertPySequence(object obj, CMemoryPool* pool,
-                              shared_ptr[CArray]* out,
-                              const shared_ptr[CDataType]& type)
-    CStatus ConvertPySequence(object obj, CMemoryPool* pool,
-                              shared_ptr[CArray]* out,
+    CStatus ConvertPySequence(object obj, const shared_ptr[CDataType]& type,
+                              CMemoryPool* pool, shared_ptr[CArray]* out)
+    CStatus ConvertPySequence(object obj, int64_t size, CMemoryPool* pool,
+                              shared_ptr[CArray]* out)
+    CStatus ConvertPySequence(object obj, int64_t size,
                               const shared_ptr[CDataType]& type,
-                              int64_t size)
+                              CMemoryPool* pool,
+                              shared_ptr[CArray]* out)
 
     CStatus NumPyDtypeToArrow(object dtype, shared_ptr[CDataType]* type)
 
diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py
index fa603b1..2b317df 100644
--- a/python/pyarrow/tests/test_convert_builtin.py
+++ b/python/pyarrow/tests/test_convert_builtin.py
@@ -23,6 +23,7 @@ import pyarrow as pa
 
 import datetime
 import decimal
+import itertools
 import numpy as np
 import six
 
@@ -68,6 +69,24 @@ def test_limited_iterator_size_underflow():
     assert arr1.equals(arr2)
 
 
+def test_iterator_without_size():
+    expected = pa.array((0, 1, 2))
+    arr1 = pa.array(iter(range(3)))
+    assert arr1.equals(expected)
+    # Same with explicit type
+    arr1 = pa.array(iter(range(3)), type=pa.int64())
+    assert arr1.equals(expected)
+
+
+def test_infinite_iterator():
+    expected = pa.array((0, 1, 2))
+    arr1 = pa.array(itertools.count(0), size=3)
+    assert arr1.equals(expected)
+    # Same with explicit type
+    arr1 = pa.array(itertools.count(0), type=pa.int64(), size=3)
+    assert arr1.equals(expected)
+
+
 def _as_list(xs):
     return xs
 

-- 
To stop receiving notification emails like this one, please contact
wesm@apache.org.