You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/31 17:46:35 UTC

[06/14] arrow git commit: ARROW-1054: [Python] Fix test failure on pandas 0.19.2, some refactoring

ARROW-1054: [Python] Fix test failure on pandas 0.19.2, some refactoring

For esoteric reasons, `MultiIndex.from_arrays` rejects non-writeable NumPy arrays. This problem isn't present in pandas 0.20.1

Author: Wes McKinney <we...@twosigma.com>

Closes #705 from wesm/ARROW-1054 and squashes the following commits:

f7dfbb24 [Wes McKinney] Factor out table to blocks into pandas_compat, fix pandas 0.19.2 test failure


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a6e77f4b
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a6e77f4b
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a6e77f4b

Branch: refs/heads/master
Commit: a6e77f4b2f4e3d5affc2aef3c5f18fa534b90472
Parents: cf4ef5e
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri May 19 11:12:33 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed May 31 13:45:48 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/python/pandas_convert.cc |  2 +-
 python/pyarrow/ipc.py                  |  3 +-
 python/pyarrow/pandas_compat.py        | 73 +++++++++++++++++++++++++++
 python/pyarrow/parquet.py              |  1 -
 python/pyarrow/table.pxi               | 76 ++---------------------------
 5 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/cpp/src/arrow/python/pandas_convert.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/pandas_convert.cc b/cpp/src/arrow/python/pandas_convert.cc
index 96dd09a..ac61cbc 100644
--- a/cpp/src/arrow/python/pandas_convert.cc
+++ b/cpp/src/arrow/python/pandas_convert.cc
@@ -1102,7 +1102,7 @@ static inline PyObject* NewArray1DFromType(
 
   set_numpy_metadata(type, arrow_type, descr);
   return PyArray_NewFromDescr(&PyArray_Type, descr, 1, dims, nullptr, data,
-      NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY, nullptr);
+      NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY | NPY_ARRAY_WRITEABLE, nullptr);
 }
 
 class PandasBlock {

http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index a61d746..6173299 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -158,4 +158,5 @@ def deserialize_pandas(buf, nthreads=1):
     """
     buffer_reader = pa.BufferReader(buf)
     reader = pa.RecordBatchFileReader(buffer_reader)
-    return reader.read_all().to_pandas(nthreads=nthreads)
+    table = reader.read_all()
+    return table.to_pandas(nthreads=nthreads)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/python/pyarrow/pandas_compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 255b31a..9711b72 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -102,3 +102,76 @@ def construct_metadata(df, index_levels, preserve_index):
             }
         ).encode('utf8')
     }
+
+
+def table_to_blockmanager(table, nthreads=1):
+    import pandas.core.internals as _int
+    from pyarrow.compat import DatetimeTZDtype
+    import pyarrow.lib as lib
+
+    block_table = table
+
+    index_columns = []
+    index_arrays = []
+    index_names = []
+    schema = table.schema
+    row_count = table.num_rows
+    metadata = schema.metadata
+
+    if metadata is not None and b'pandas' in metadata:
+        pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
+        index_columns = pandas_metadata['index_columns']
+
+    for name in index_columns:
+        i = schema.get_field_index(name)
+        if i != -1:
+            col = table.column(i)
+            index_name = (None if is_unnamed_index_level(name)
+                          else name)
+            values = col.to_pandas().values
+            if not values.flags.writeable:
+                # ARROW-1054: in pandas 0.19.2, factorize will reject
+                # non-writeable arrays when calling MultiIndex.from_arrays
+                values = values.copy()
+
+            index_arrays.append(values)
+            index_names.append(index_name)
+            block_table = block_table.remove_column(
+                block_table.schema.get_field_index(name)
+            )
+
+    result = lib.table_to_blocks(block_table, nthreads)
+
+    blocks = []
+    for item in result:
+        block_arr = item['block']
+        placement = item['placement']
+        if 'dictionary' in item:
+            cat = pd.Categorical(block_arr,
+                                 categories=item['dictionary'],
+                                 ordered=False, fastpath=True)
+            block = _int.make_block(cat, placement=placement,
+                                    klass=_int.CategoricalBlock,
+                                    fastpath=True)
+        elif 'timezone' in item:
+            dtype = DatetimeTZDtype('ns', tz=item['timezone'])
+            block = _int.make_block(block_arr, placement=placement,
+                                    klass=_int.DatetimeTZBlock,
+                                    dtype=dtype, fastpath=True)
+        else:
+            block = _int.make_block(block_arr, placement=placement)
+        blocks.append(block)
+
+    if len(index_arrays) > 1:
+        index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
+    elif len(index_arrays) == 1:
+        index = pd.Index(index_arrays[0], name=index_names[0])
+    else:
+        index = pd.RangeIndex(row_count)
+
+    axes = [
+        [column.name for column in block_table.itercolumns()],
+        index
+    ]
+
+    return _int.BlockManager(blocks, axes)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index f59a719..dc26dab 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import itertools
 import json
 
 import six

http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 25a4f84..3f67ba4 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -559,86 +559,20 @@ cdef class RecordBatch:
         return pyarrow_wrap_batch(batch)
 
 
-cdef table_to_blockmanager(const shared_ptr[CTable]& ctable, int nthreads):
-    import pandas.core.internals as _int
-    from pandas import RangeIndex, Categorical
-    from pyarrow.compat import DatetimeTZDtype
-
-    cdef:
-        Table table = pyarrow_wrap_table(ctable)
-        Table block_table = pyarrow_wrap_table(ctable)
-        Schema schema = table.schema
-
-        size_t row_count = table.num_rows
-        size_t total_columns = table.num_columns
-
-        dict metadata = schema.metadata
-        dict pandas_metadata = None
-
-        list index_columns = []
-        list index_arrays = []
-
-    if metadata is not None and b'pandas' in metadata:
-        pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
-        index_columns = pandas_metadata['index_columns']
-
-    cdef:
-        Column col
-        int64_t i
-
-    for name in index_columns:
-        i = schema.get_field_index(name)
-        if i != -1:
-            col = table.column(i)
-            index_name = None if pdcompat.is_unnamed_index_level(name) else name
-            index_arrays.append(
-                pd.Index(col.to_pandas().values, name=index_name)
-            )
-            block_table = block_table.remove_column(
-                block_table.schema.get_field_index(name)
-            )
-
+def table_to_blocks(Table table, int nthreads):
     cdef:
         PyObject* result_obj
-        shared_ptr[CTable] c_block_table = block_table.sp_table
+        shared_ptr[CTable] c_table = table.sp_table
 
     with nogil:
         check_status(
             libarrow.ConvertTableToPandas(
-                c_block_table, nthreads, &result_obj
+                c_table, nthreads, &result_obj
             )
         )
 
-    result = PyObject_to_object(result_obj)
-
-    blocks = []
-    for item in result:
-        block_arr = item['block']
-        placement = item['placement']
-        if 'dictionary' in item:
-            cat = Categorical(block_arr,
-                              categories=item['dictionary'],
-                              ordered=False, fastpath=True)
-            block = _int.make_block(cat, placement=placement,
-                                    klass=_int.CategoricalBlock,
-                                    fastpath=True)
-        elif 'timezone' in item:
-            dtype = DatetimeTZDtype('ns', tz=item['timezone'])
-            block = _int.make_block(block_arr, placement=placement,
-                                    klass=_int.DatetimeTZBlock,
-                                    dtype=dtype, fastpath=True)
-        else:
-            block = _int.make_block(block_arr, placement=placement)
-        blocks.append(block)
-
-    cdef list axes = [
-        [column.name for column in block_table.itercolumns()],
-        pd.MultiIndex.from_arrays(
-            index_arrays
-        ) if index_arrays else pd.RangeIndex(row_count),
-    ]
+    return PyObject_to_object(result_obj)
 
-    return _int.BlockManager(blocks, axes)
 
 
 cdef class Table:
@@ -829,7 +763,7 @@ cdef class Table:
         if nthreads is None:
             nthreads = cpu_count()
 
-        mgr = table_to_blockmanager(self.sp_table, nthreads)
+        mgr = pdcompat.table_to_blockmanager(self, nthreads)
         return pd.DataFrame(mgr)
 
     def to_pydict(self):