You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/31 17:46:35 UTC
[06/14] arrow git commit: ARROW-1054: [Python] Fix test failure on
pandas 0.19.2, some refactoring
ARROW-1054: [Python] Fix test failure on pandas 0.19.2, some refactoring
For esoteric reasons, `MultiIndex.from_arrays` rejects non-writeable NumPy arrays. This problem isn't present in pandas 0.20.1
Author: Wes McKinney <we...@twosigma.com>
Closes #705 from wesm/ARROW-1054 and squashes the following commits:
f7dfbb24 [Wes McKinney] Factor out table to blocks into pandas_compat, fix pandas 0.19.2 test failure
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a6e77f4b
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a6e77f4b
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a6e77f4b
Branch: refs/heads/master
Commit: a6e77f4b2f4e3d5affc2aef3c5f18fa534b90472
Parents: cf4ef5e
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri May 19 11:12:33 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed May 31 13:45:48 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/python/pandas_convert.cc | 2 +-
python/pyarrow/ipc.py | 3 +-
python/pyarrow/pandas_compat.py | 73 +++++++++++++++++++++++++++
python/pyarrow/parquet.py | 1 -
python/pyarrow/table.pxi | 76 ++---------------------------
5 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/cpp/src/arrow/python/pandas_convert.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/pandas_convert.cc b/cpp/src/arrow/python/pandas_convert.cc
index 96dd09a..ac61cbc 100644
--- a/cpp/src/arrow/python/pandas_convert.cc
+++ b/cpp/src/arrow/python/pandas_convert.cc
@@ -1102,7 +1102,7 @@ static inline PyObject* NewArray1DFromType(
set_numpy_metadata(type, arrow_type, descr);
return PyArray_NewFromDescr(&PyArray_Type, descr, 1, dims, nullptr, data,
- NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY, nullptr);
+ NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY | NPY_ARRAY_WRITEABLE, nullptr);
}
class PandasBlock {
http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index a61d746..6173299 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -158,4 +158,5 @@ def deserialize_pandas(buf, nthreads=1):
"""
buffer_reader = pa.BufferReader(buf)
reader = pa.RecordBatchFileReader(buffer_reader)
- return reader.read_all().to_pandas(nthreads=nthreads)
+ table = reader.read_all()
+ return table.to_pandas(nthreads=nthreads)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/python/pyarrow/pandas_compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 255b31a..9711b72 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -102,3 +102,76 @@ def construct_metadata(df, index_levels, preserve_index):
}
).encode('utf8')
}
+
+
+def table_to_blockmanager(table, nthreads=1):
+ import pandas.core.internals as _int
+ from pyarrow.compat import DatetimeTZDtype
+ import pyarrow.lib as lib
+
+ block_table = table
+
+ index_columns = []
+ index_arrays = []
+ index_names = []
+ schema = table.schema
+ row_count = table.num_rows
+ metadata = schema.metadata
+
+ if metadata is not None and b'pandas' in metadata:
+ pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
+ index_columns = pandas_metadata['index_columns']
+
+ for name in index_columns:
+ i = schema.get_field_index(name)
+ if i != -1:
+ col = table.column(i)
+ index_name = (None if is_unnamed_index_level(name)
+ else name)
+ values = col.to_pandas().values
+ if not values.flags.writeable:
+ # ARROW-1054: in pandas 0.19.2, factorize will reject
+ # non-writeable arrays when calling MultiIndex.from_arrays
+ values = values.copy()
+
+ index_arrays.append(values)
+ index_names.append(index_name)
+ block_table = block_table.remove_column(
+ block_table.schema.get_field_index(name)
+ )
+
+ result = lib.table_to_blocks(block_table, nthreads)
+
+ blocks = []
+ for item in result:
+ block_arr = item['block']
+ placement = item['placement']
+ if 'dictionary' in item:
+ cat = pd.Categorical(block_arr,
+ categories=item['dictionary'],
+ ordered=False, fastpath=True)
+ block = _int.make_block(cat, placement=placement,
+ klass=_int.CategoricalBlock,
+ fastpath=True)
+ elif 'timezone' in item:
+ dtype = DatetimeTZDtype('ns', tz=item['timezone'])
+ block = _int.make_block(block_arr, placement=placement,
+ klass=_int.DatetimeTZBlock,
+ dtype=dtype, fastpath=True)
+ else:
+ block = _int.make_block(block_arr, placement=placement)
+ blocks.append(block)
+
+ if len(index_arrays) > 1:
+ index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
+ elif len(index_arrays) == 1:
+ index = pd.Index(index_arrays[0], name=index_names[0])
+ else:
+ index = pd.RangeIndex(row_count)
+
+ axes = [
+ [column.name for column in block_table.itercolumns()],
+ index
+ ]
+
+ return _int.BlockManager(blocks, axes)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index f59a719..dc26dab 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-import itertools
import json
import six
http://git-wip-us.apache.org/repos/asf/arrow/blob/a6e77f4b/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 25a4f84..3f67ba4 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -559,86 +559,20 @@ cdef class RecordBatch:
return pyarrow_wrap_batch(batch)
-cdef table_to_blockmanager(const shared_ptr[CTable]& ctable, int nthreads):
- import pandas.core.internals as _int
- from pandas import RangeIndex, Categorical
- from pyarrow.compat import DatetimeTZDtype
-
- cdef:
- Table table = pyarrow_wrap_table(ctable)
- Table block_table = pyarrow_wrap_table(ctable)
- Schema schema = table.schema
-
- size_t row_count = table.num_rows
- size_t total_columns = table.num_columns
-
- dict metadata = schema.metadata
- dict pandas_metadata = None
-
- list index_columns = []
- list index_arrays = []
-
- if metadata is not None and b'pandas' in metadata:
- pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
- index_columns = pandas_metadata['index_columns']
-
- cdef:
- Column col
- int64_t i
-
- for name in index_columns:
- i = schema.get_field_index(name)
- if i != -1:
- col = table.column(i)
- index_name = None if pdcompat.is_unnamed_index_level(name) else name
- index_arrays.append(
- pd.Index(col.to_pandas().values, name=index_name)
- )
- block_table = block_table.remove_column(
- block_table.schema.get_field_index(name)
- )
-
+def table_to_blocks(Table table, int nthreads):
cdef:
PyObject* result_obj
- shared_ptr[CTable] c_block_table = block_table.sp_table
+ shared_ptr[CTable] c_table = table.sp_table
with nogil:
check_status(
libarrow.ConvertTableToPandas(
- c_block_table, nthreads, &result_obj
+ c_table, nthreads, &result_obj
)
)
- result = PyObject_to_object(result_obj)
-
- blocks = []
- for item in result:
- block_arr = item['block']
- placement = item['placement']
- if 'dictionary' in item:
- cat = Categorical(block_arr,
- categories=item['dictionary'],
- ordered=False, fastpath=True)
- block = _int.make_block(cat, placement=placement,
- klass=_int.CategoricalBlock,
- fastpath=True)
- elif 'timezone' in item:
- dtype = DatetimeTZDtype('ns', tz=item['timezone'])
- block = _int.make_block(block_arr, placement=placement,
- klass=_int.DatetimeTZBlock,
- dtype=dtype, fastpath=True)
- else:
- block = _int.make_block(block_arr, placement=placement)
- blocks.append(block)
-
- cdef list axes = [
- [column.name for column in block_table.itercolumns()],
- pd.MultiIndex.from_arrays(
- index_arrays
- ) if index_arrays else pd.RangeIndex(row_count),
- ]
+ return PyObject_to_object(result_obj)
- return _int.BlockManager(blocks, axes)
cdef class Table:
@@ -829,7 +763,7 @@ cdef class Table:
if nthreads is None:
nthreads = cpu_count()
- mgr = table_to_blockmanager(self.sp_table, nthreads)
+ mgr = pdcompat.table_to_blockmanager(self, nthreads)
return pd.DataFrame(mgr)
def to_pydict(self):