You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/02 19:53:03 UTC
arrow git commit: ARROW-369: [Python] Convert multiple record batches
at once to Pandas
Repository: arrow
Updated Branches:
refs/heads/master ebe7dc8f5 -> b5de9e56d
ARROW-369: [Python] Convert multiple record batches at once to Pandas
Modified Pandas adapter to handle columns with multiple chunks with `ConvertColumnToPandas`. This modifies the pyarrow public API by adding a class `RecordBatchList` and static method `toPandas` which takes a list of Arrow RecordBatches and outputs a Pandas DataFrame.
Adds unit test in test_table.py to do the conversion for each column with typed specialization.
Author: Bryan Cutler <cu...@gmail.com>
Closes #216 from BryanCutler/multi-batch-toPandas-ARROW-369 and squashes the following commits:
b6c9986 [Bryan Cutler] fixed formatting
edf056e [Bryan Cutler] simplified with pyarrow.schema.Schema.equals
068bc1b [Bryan Cutler] Merge remote-tracking branch 'upstream/master' into multi-batch-toPandas-ARROW-369
da65345 [Bryan Cutler] fixed test case for schema checking
9edb0ba [Bryan Cutler] used auto keyword where some typecasting was done in ConvertValues
bd2a720 [Bryan Cutler] added testcase for schema not equal, disabled now
c3d7e8f [Bryan Cutler] Changed conversion to make Table from columns first, now conversion is now just a free function
3ee51e6 [Bryan Cutler] cleanup
398b18d [Bryan Cutler] Fixed case for Integer specialization without nulls
7b29a55 [Bryan Cutler] Initial working version of RecordBatch list to_pandas, need more tests and cleanup
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b5de9e56
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b5de9e56
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b5de9e56
Branch: refs/heads/master
Commit: b5de9e56db08480050445dd883643448af12b81b
Parents: ebe7dc8
Author: Bryan Cutler <cu...@gmail.com>
Authored: Fri Dec 2 14:34:47 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Dec 2 14:34:47 2016 -0500
----------------------------------------------------------------------
python/pyarrow/__init__.py | 4 +-
python/pyarrow/includes/libarrow.pxd | 3 +
python/pyarrow/table.pyx | 47 +++++++
python/pyarrow/tests/test_table.py | 35 +++++
python/src/pyarrow/adapters/pandas.cc | 209 ++++++++++++++++++-----------
5 files changed, 219 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 775ce7e..d4d0f00 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -41,5 +41,7 @@ from pyarrow.schema import (null, bool_,
list_, struct, field,
DataType, Field, Schema, schema)
-from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe
+from pyarrow.table import (Column, RecordBatch, dataframe_from_batches, Table,
+ from_pandas_dataframe)
+
from pyarrow.version import version as __version__
http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 19da408..350ebe3 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -158,6 +158,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CColumn(const shared_ptr[CField]& field,
const shared_ptr[CArray]& data)
+ CColumn(const shared_ptr[CField]& field,
+ const vector[shared_ptr[CArray]]& chunks)
+
int64_t length()
int64_t null_count()
const c_string& name()
http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index a6715b1..45cf7be 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -28,6 +28,7 @@ cimport pyarrow.includes.pyarrow as pyarrow
import pyarrow.config
from pyarrow.array cimport Array, box_arrow_array
+from pyarrow.error import ArrowException
from pyarrow.error cimport check_status
from pyarrow.schema cimport box_data_type, box_schema
@@ -414,6 +415,52 @@ cdef class RecordBatch:
return result
+def dataframe_from_batches(batches):
+ """
+ Convert a list of Arrow RecordBatches to a pandas.DataFrame
+
+ Parameters
+ ----------
+
+ batches: list of RecordBatch
+ RecordBatch list to be converted, schemas must be equal
+ """
+
+ cdef:
+ vector[shared_ptr[CArray]] c_array_chunks
+ vector[shared_ptr[CColumn]] c_columns
+ shared_ptr[CTable] c_table
+ Array arr
+ Schema schema
+
+ import pandas as pd
+
+ schema = batches[0].schema
+
+ # check schemas are equal
+ if any((not schema.equals(other.schema) for other in batches[1:])):
+ raise ArrowException("Error converting list of RecordBatches to "
+ "DataFrame, not all schemas are equal")
+
+ cdef int K = batches[0].num_columns
+
+ # create chunked columns from the batches
+ c_columns.resize(K)
+ for i in range(K):
+ for batch in batches:
+ arr = batch[i]
+ c_array_chunks.push_back(arr.sp_array)
+ c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i),
+ c_array_chunks))
+ c_array_chunks.clear()
+
+ # create a Table from columns and convert to DataFrame
+ c_table.reset(new CTable('', schema.sp_schema, c_columns))
+ table = Table()
+ table.init(c_table)
+ return table.to_pandas()
+
+
cdef class Table:
"""
A collection of top-level named, equal length Arrow arrays.
http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 4c9d302..dc4f37a 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -19,6 +19,7 @@ import numpy as np
from pandas.util.testing import assert_frame_equal
import pandas as pd
+import pytest
import pyarrow as pa
@@ -50,6 +51,40 @@ def test_recordbatch_from_to_pandas():
assert_frame_equal(data, result)
+def test_recordbatchlist_to_pandas():
+ data1 = pd.DataFrame({
+ 'c1': np.array([1, 1, 2], dtype='uint32'),
+ 'c2': np.array([1.0, 2.0, 3.0], dtype='float64'),
+ 'c3': [True, None, False],
+ 'c4': ['foo', 'bar', None]
+ })
+
+ data2 = pd.DataFrame({
+ 'c1': np.array([3, 5], dtype='uint32'),
+ 'c2': np.array([4.0, 5.0], dtype='float64'),
+ 'c3': [True, True],
+ 'c4': ['baz', 'qux']
+ })
+
+ batch1 = pa.RecordBatch.from_pandas(data1)
+ batch2 = pa.RecordBatch.from_pandas(data2)
+
+ result = pa.dataframe_from_batches([batch1, batch2])
+ data = pd.concat([data1, data2], ignore_index=True)
+ assert_frame_equal(data, result)
+
+
+def test_recordbatchlist_schema_equals():
+ data1 = pd.DataFrame({'c1': np.array([1], dtype='uint32')})
+ data2 = pd.DataFrame({'c1': np.array([4.0, 5.0], dtype='float64')})
+
+ batch1 = pa.RecordBatch.from_pandas(data1)
+ batch2 = pa.RecordBatch.from_pandas(data2)
+
+ with pytest.raises(pa.ArrowException):
+ pa.dataframe_from_batches([batch1, batch2])
+
+
def test_table_basics():
data = [
pa.from_pylist(range(5)),
http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index 1f5b700..adb27e8 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -597,14 +597,10 @@ class ArrowDeserializer {
Status Convert(PyObject** out) {
const std::shared_ptr<arrow::ChunkedArray> data = col_->data();
- if (data->num_chunks() > 1) {
- return Status::NotImplemented("Chunked column conversion NYI");
- }
-
- auto chunk = data->chunk(0);
- RETURN_NOT_OK(ConvertValues<TYPE>(chunk));
+ RETURN_NOT_OK(ConvertValues<TYPE>(data));
*out = reinterpret_cast<PyObject*>(out_);
+
return Status::OK();
}
@@ -654,27 +650,48 @@ class ArrowDeserializer {
}
template <int T2>
+ Status ConvertValuesZeroCopy(std::shared_ptr<Array> arr) {
+ typedef typename arrow_traits<T2>::T T;
+
+ auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+ auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+
+ // Zero-Copy. We can pass the data pointer directly to NumPy.
+ void* data = const_cast<T*>(in_values);
+ int type = arrow_traits<TYPE>::npy_type;
+ RETURN_NOT_OK(OutputFromData(type, data));
+
+ return Status::OK();
+ }
+
+ template <int T2>
inline typename std::enable_if<
arrow_traits<T2>::is_pandas_numeric_nullable, Status>::type
- ConvertValues(const std::shared_ptr<Array>& arr) {
+ ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) {
typedef typename arrow_traits<T2>::T T;
+ size_t chunk_offset = 0;
- arrow::PrimitiveArray* prim_arr = static_cast<arrow::PrimitiveArray*>(
- arr.get());
- const T* in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+ if (data->num_chunks() == 1 && data->null_count() == 0) {
+ return ConvertValuesZeroCopy<TYPE>(data->chunk(0));
+ }
+
+ RETURN_NOT_OK(AllocateOutput(arrow_traits<T2>::npy_type));
- if (arr->null_count() > 0) {
- RETURN_NOT_OK(AllocateOutput(arrow_traits<T2>::npy_type));
+ for (int c = 0; c < data->num_chunks(); c++) {
+ const std::shared_ptr<Array> arr = data->chunk(c);
+ auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+ auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+ auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)) + chunk_offset;
- T* out_values = reinterpret_cast<T*>(PyArray_DATA(out_));
- for (int64_t i = 0; i < arr->length(); ++i) {
- out_values[i] = arr->IsNull(i) ? arrow_traits<T2>::na_value : in_values[i];
+ if (arr->null_count() > 0) {
+ for (int64_t i = 0; i < arr->length(); ++i) {
+ out_values[i] = arr->IsNull(i) ? arrow_traits<T2>::na_value : in_values[i];
+ }
+ } else {
+ memcpy(out_values, in_values, sizeof(T) * arr->length());
}
- } else {
- // Zero-Copy. We can pass the data pointer directly to NumPy.
- void* data = const_cast<T*>(in_values);
- int type = arrow_traits<TYPE>::npy_type;
- RETURN_NOT_OK(OutputFromData(type, data));
+
+ chunk_offset += arr->length();
}
return Status::OK();
@@ -684,27 +701,43 @@ class ArrowDeserializer {
template <int T2>
inline typename std::enable_if<
arrow_traits<T2>::is_pandas_numeric_not_nullable, Status>::type
- ConvertValues(const std::shared_ptr<Array>& arr) {
+ ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) {
typedef typename arrow_traits<T2>::T T;
+ size_t chunk_offset = 0;
- arrow::PrimitiveArray* prim_arr = static_cast<arrow::PrimitiveArray*>(
- arr.get());
-
- const T* in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+ if (data->num_chunks() == 1 && data->null_count() == 0) {
+ return ConvertValuesZeroCopy<TYPE>(data->chunk(0));
+ }
- if (arr->null_count() > 0) {
+ if (data->null_count() > 0) {
RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64));
- // Upcast to double, set NaN as appropriate
- double* out_values = reinterpret_cast<double*>(PyArray_DATA(out_));
- for (int i = 0; i < arr->length(); ++i) {
- out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i];
+ for (int c = 0; c < data->num_chunks(); c++) {
+ const std::shared_ptr<Array> arr = data->chunk(c);
+ auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+ auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+ // Upcast to double, set NaN as appropriate
+ auto out_values = reinterpret_cast<double*>(PyArray_DATA(out_)) + chunk_offset;
+
+ for (int i = 0; i < arr->length(); ++i) {
+ out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i];
+ }
+
+ chunk_offset += arr->length();
}
} else {
- // Zero-Copy. We can pass the data pointer directly to NumPy.
- void* data = const_cast<T*>(in_values);
- int type = arrow_traits<TYPE>::npy_type;
- RETURN_NOT_OK(OutputFromData(type, data));
+ RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
+
+ for (int c = 0; c < data->num_chunks(); c++) {
+ const std::shared_ptr<Array> arr = data->chunk(c);
+ auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+ auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+ auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)) + chunk_offset;
+
+ memcpy(out_values, in_values, sizeof(T) * arr->length());
+
+ chunk_offset += arr->length();
+ }
}
return Status::OK();
@@ -714,35 +747,48 @@ class ArrowDeserializer {
template <int T2>
inline typename std::enable_if<
arrow_traits<T2>::is_boolean, Status>::type
- ConvertValues(const std::shared_ptr<Array>& arr) {
+ ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) {
+ size_t chunk_offset = 0;
PyAcquireGIL lock;
- arrow::BooleanArray* bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
-
- if (arr->null_count() > 0) {
+ if (data->null_count() > 0) {
RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
- PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));
- for (int64_t i = 0; i < arr->length(); ++i) {
- if (bool_arr->IsNull(i)) {
- Py_INCREF(Py_None);
- out_values[i] = Py_None;
- } else if (bool_arr->Value(i)) {
- // True
- Py_INCREF(Py_True);
- out_values[i] = Py_True;
- } else {
- // False
- Py_INCREF(Py_False);
- out_values[i] = Py_False;
+ for (int c = 0; c < data->num_chunks(); c++) {
+ const std::shared_ptr<Array> arr = data->chunk(c);
+ auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
+ auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)) + chunk_offset;
+
+ for (int64_t i = 0; i < arr->length(); ++i) {
+ if (bool_arr->IsNull(i)) {
+ Py_INCREF(Py_None);
+ out_values[i] = Py_None;
+ } else if (bool_arr->Value(i)) {
+ // True
+ Py_INCREF(Py_True);
+ out_values[i] = Py_True;
+ } else {
+ // False
+ Py_INCREF(Py_False);
+ out_values[i] = Py_False;
+ }
}
+
+ chunk_offset += bool_arr->length();
}
} else {
RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
- uint8_t* out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(out_));
- for (int64_t i = 0; i < arr->length(); ++i) {
- out_values[i] = static_cast<uint8_t>(bool_arr->Value(i));
+ for (int c = 0; c < data->num_chunks(); c++) {
+ const std::shared_ptr<Array> arr = data->chunk(c);
+ auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
+ auto out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(out_)) + chunk_offset;
+
+ for (int64_t i = 0; i < arr->length(); ++i) {
+ out_values[i] = static_cast<uint8_t>(bool_arr->Value(i));
+ }
+
+ chunk_offset += bool_arr->length();
}
}
@@ -753,42 +799,49 @@ class ArrowDeserializer {
template <int T2>
inline typename std::enable_if<
T2 == arrow::Type::STRING, Status>::type
- ConvertValues(const std::shared_ptr<Array>& arr) {
+ ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) {
+ size_t chunk_offset = 0;
PyAcquireGIL lock;
RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
- PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));
-
- arrow::StringArray* string_arr = static_cast<arrow::StringArray*>(arr.get());
-
- const uint8_t* data;
- int32_t length;
- if (arr->null_count() > 0) {
- for (int64_t i = 0; i < arr->length(); ++i) {
- if (string_arr->IsNull(i)) {
- Py_INCREF(Py_None);
- out_values[i] = Py_None;
- } else {
- data = string_arr->GetValue(i, &length);
-
- out_values[i] = make_pystring(data, length);
+ for (int c = 0; c < data->num_chunks(); c++) {
+ const std::shared_ptr<Array> arr = data->chunk(c);
+ auto string_arr = static_cast<arrow::StringArray*>(arr.get());
+ auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)) + chunk_offset;
+
+ const uint8_t* data_ptr;
+ int32_t length;
+ if (data->null_count() > 0) {
+ for (int64_t i = 0; i < arr->length(); ++i) {
+ if (string_arr->IsNull(i)) {
+ Py_INCREF(Py_None);
+ out_values[i] = Py_None;
+ } else {
+ data_ptr = string_arr->GetValue(i, &length);
+
+ out_values[i] = make_pystring(data_ptr, length);
+ if (out_values[i] == nullptr) {
+ return Status::UnknownError("String initialization failed");
+ }
+ }
+ }
+ } else {
+ for (int64_t i = 0; i < arr->length(); ++i) {
+ data_ptr = string_arr->GetValue(i, &length);
+ out_values[i] = make_pystring(data_ptr, length);
if (out_values[i] == nullptr) {
return Status::UnknownError("String initialization failed");
}
}
}
- } else {
- for (int64_t i = 0; i < arr->length(); ++i) {
- data = string_arr->GetValue(i, &length);
- out_values[i] = make_pystring(data, length);
- if (out_values[i] == nullptr) {
- return Status::UnknownError("String initialization failed");
- }
- }
+
+ chunk_offset += string_arr->length();
}
+
return Status::OK();
}
+
private:
std::shared_ptr<Column> col_;
PyObject* py_ref_;