You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/02 19:53:03 UTC

arrow git commit: ARROW-369: [Python] Convert multiple record batches at once to Pandas

Repository: arrow
Updated Branches:
  refs/heads/master ebe7dc8f5 -> b5de9e56d


ARROW-369: [Python] Convert multiple record batches at once to Pandas

Modified Pandas adapter to handle columns with multiple chunks with `ConvertColumnToPandas`.  This modifies the pyarrow public API by adding a class `RecordBatchList` and static method `toPandas` which takes a list of Arrow RecordBatches and outputs a Pandas DataFrame.

Adds unit test in test_table.py to do the conversion for each column with typed specialization.

Author: Bryan Cutler <cu...@gmail.com>

Closes #216 from BryanCutler/multi-batch-toPandas-ARROW-369 and squashes the following commits:

b6c9986 [Bryan Cutler] fixed formatting
edf056e [Bryan Cutler] simplified with pyarrow.schema.Schema.equals
068bc1b [Bryan Cutler] Merge remote-tracking branch 'upstream/master' into multi-batch-toPandas-ARROW-369
da65345 [Bryan Cutler] fixed test case for schema checking
9edb0ba [Bryan Cutler] used auto keyword where some typecasting was done in ConvertValues
bd2a720 [Bryan Cutler] added testcase for schema not equal, disabled now
c3d7e8f [Bryan Cutler] Changed conversion to make Table from columns first, now conversion is now just a free function
3ee51e6 [Bryan Cutler] cleanup
398b18d [Bryan Cutler] Fixed case for Integer specialization without nulls
7b29a55 [Bryan Cutler] Initial working version of RecordBatch list to_pandas, need more tests and cleanup


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b5de9e56
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b5de9e56
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b5de9e56

Branch: refs/heads/master
Commit: b5de9e56db08480050445dd883643448af12b81b
Parents: ebe7dc8
Author: Bryan Cutler <cu...@gmail.com>
Authored: Fri Dec 2 14:34:47 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Dec 2 14:34:47 2016 -0500

----------------------------------------------------------------------
 python/pyarrow/__init__.py            |   4 +-
 python/pyarrow/includes/libarrow.pxd  |   3 +
 python/pyarrow/table.pyx              |  47 +++++++
 python/pyarrow/tests/test_table.py    |  35 +++++
 python/src/pyarrow/adapters/pandas.cc | 209 ++++++++++++++++++-----------
 5 files changed, 219 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 775ce7e..d4d0f00 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -41,5 +41,7 @@ from pyarrow.schema import (null, bool_,
                             list_, struct, field,
                             DataType, Field, Schema, schema)
 
-from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe
+from pyarrow.table import (Column, RecordBatch, dataframe_from_batches, Table,
+                           from_pandas_dataframe)
+
 from pyarrow.version import version as __version__

http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 19da408..350ebe3 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -158,6 +158,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         CColumn(const shared_ptr[CField]& field,
                 const shared_ptr[CArray]& data)
 
+        CColumn(const shared_ptr[CField]& field,
+                const vector[shared_ptr[CArray]]& chunks)
+
         int64_t length()
         int64_t null_count()
         const c_string& name()

http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index a6715b1..45cf7be 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -28,6 +28,7 @@ cimport pyarrow.includes.pyarrow as pyarrow
 import pyarrow.config
 
 from pyarrow.array cimport Array, box_arrow_array
+from pyarrow.error import ArrowException
 from pyarrow.error cimport check_status
 from pyarrow.schema cimport box_data_type, box_schema
 
@@ -414,6 +415,52 @@ cdef class RecordBatch:
         return result
 
 
+def dataframe_from_batches(batches):
+    """
+    Convert a list of Arrow RecordBatches to a pandas.DataFrame
+
+    Parameters
+    ----------
+
+    batches: list of RecordBatch
+        RecordBatch list to be converted, schemas must be equal
+    """
+
+    cdef:
+        vector[shared_ptr[CArray]] c_array_chunks
+        vector[shared_ptr[CColumn]] c_columns
+        shared_ptr[CTable] c_table
+        Array arr
+        Schema schema
+
+    import pandas as pd
+
+    schema = batches[0].schema
+
+    # check schemas are equal
+    if any((not schema.equals(other.schema) for other in batches[1:])):
+        raise ArrowException("Error converting list of RecordBatches to "
+                "DataFrame, not all schemas are equal")
+
+    cdef int K = batches[0].num_columns
+
+    # create chunked columns from the batches
+    c_columns.resize(K)
+    for i in range(K):
+        for batch in batches:
+            arr = batch[i]
+            c_array_chunks.push_back(arr.sp_array)
+        c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i),
+                           c_array_chunks))
+        c_array_chunks.clear()
+
+    # create a Table from columns and convert to DataFrame
+    c_table.reset(new CTable('', schema.sp_schema, c_columns))
+    table = Table()
+    table.init(c_table)
+    return table.to_pandas()
+
+
 cdef class Table:
     """
     A collection of top-level named, equal length Arrow arrays.

http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 4c9d302..dc4f37a 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -19,6 +19,7 @@ import numpy as np
 
 from pandas.util.testing import assert_frame_equal
 import pandas as pd
+import pytest
 
 import pyarrow as pa
 
@@ -50,6 +51,40 @@ def test_recordbatch_from_to_pandas():
     assert_frame_equal(data, result)
 
 
+def test_recordbatchlist_to_pandas():
+    data1 = pd.DataFrame({
+        'c1': np.array([1, 1, 2], dtype='uint32'),
+        'c2': np.array([1.0, 2.0, 3.0], dtype='float64'),
+        'c3': [True, None, False],
+        'c4': ['foo', 'bar', None]
+    })
+
+    data2 = pd.DataFrame({
+        'c1': np.array([3, 5], dtype='uint32'),
+        'c2': np.array([4.0, 5.0], dtype='float64'),
+        'c3': [True, True],
+        'c4': ['baz', 'qux']
+    })
+
+    batch1 = pa.RecordBatch.from_pandas(data1)
+    batch2 = pa.RecordBatch.from_pandas(data2)
+
+    result = pa.dataframe_from_batches([batch1, batch2])
+    data = pd.concat([data1, data2], ignore_index=True)
+    assert_frame_equal(data, result)
+
+
+def test_recordbatchlist_schema_equals():
+    data1 = pd.DataFrame({'c1': np.array([1], dtype='uint32')})
+    data2 = pd.DataFrame({'c1': np.array([4.0, 5.0], dtype='float64')})
+
+    batch1 = pa.RecordBatch.from_pandas(data1)
+    batch2 = pa.RecordBatch.from_pandas(data2)
+
+    with pytest.raises(pa.ArrowException):
+        pa.dataframe_from_batches([batch1, batch2])
+
+
 def test_table_basics():
     data = [
         pa.from_pylist(range(5)),

http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index 1f5b700..adb27e8 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -597,14 +597,10 @@ class ArrowDeserializer {
 
   Status Convert(PyObject** out) {
     const std::shared_ptr<arrow::ChunkedArray> data = col_->data();
-    if (data->num_chunks() > 1) {
-      return Status::NotImplemented("Chunked column conversion NYI");
-    }
-
-    auto chunk = data->chunk(0);
 
-    RETURN_NOT_OK(ConvertValues<TYPE>(chunk));
+    RETURN_NOT_OK(ConvertValues<TYPE>(data));
     *out = reinterpret_cast<PyObject*>(out_);
+
     return Status::OK();
   }
 
@@ -654,27 +650,48 @@ class ArrowDeserializer {
   }
 
   template <int T2>
+  Status ConvertValuesZeroCopy(std::shared_ptr<Array> arr) {
+    typedef typename arrow_traits<T2>::T T;
+
+    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+
+    // Zero-Copy. We can pass the data pointer directly to NumPy.
+    void* data = const_cast<T*>(in_values);
+    int type = arrow_traits<TYPE>::npy_type;
+    RETURN_NOT_OK(OutputFromData(type, data));
+
+    return Status::OK();
+  }
+
+  template <int T2>
   inline typename std::enable_if<
     arrow_traits<T2>::is_pandas_numeric_nullable, Status>::type
-  ConvertValues(const std::shared_ptr<Array>& arr) {
+  ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) {
     typedef typename arrow_traits<T2>::T T;
+    size_t chunk_offset = 0;
 
-    arrow::PrimitiveArray* prim_arr = static_cast<arrow::PrimitiveArray*>(
-        arr.get());
-    const T* in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+    if (data->num_chunks() == 1 && data->null_count() == 0) {
+      return ConvertValuesZeroCopy<TYPE>(data->chunk(0));
+    }
+
+    RETURN_NOT_OK(AllocateOutput(arrow_traits<T2>::npy_type));
 
-    if (arr->null_count() > 0) {
-      RETURN_NOT_OK(AllocateOutput(arrow_traits<T2>::npy_type));
+    for (int c = 0; c < data->num_chunks(); c++) {
+      const std::shared_ptr<Array> arr = data->chunk(c);
+      auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+      auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+      auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)) + chunk_offset;
 
-      T* out_values = reinterpret_cast<T*>(PyArray_DATA(out_));
-      for (int64_t i = 0; i < arr->length(); ++i) {
-        out_values[i] = arr->IsNull(i) ? arrow_traits<T2>::na_value : in_values[i];
+      if (arr->null_count() > 0) {
+        for (int64_t i = 0; i < arr->length(); ++i) {
+          out_values[i] = arr->IsNull(i) ? arrow_traits<T2>::na_value : in_values[i];
+        }
+      } else {
+        memcpy(out_values, in_values, sizeof(T) * arr->length());
       }
-    } else {
-      // Zero-Copy. We can pass the data pointer directly to NumPy.
-      void* data = const_cast<T*>(in_values);
-      int type = arrow_traits<TYPE>::npy_type;
-      RETURN_NOT_OK(OutputFromData(type, data));
+
+      chunk_offset += arr->length();
     }
 
     return Status::OK();
@@ -684,27 +701,43 @@ class ArrowDeserializer {
   template <int T2>
   inline typename std::enable_if<
     arrow_traits<T2>::is_pandas_numeric_not_nullable, Status>::type
-  ConvertValues(const std::shared_ptr<Array>& arr) {
+  ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) {
     typedef typename arrow_traits<T2>::T T;
+    size_t chunk_offset = 0;
 
-    arrow::PrimitiveArray* prim_arr = static_cast<arrow::PrimitiveArray*>(
-        arr.get());
-
-    const T* in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+    if (data->num_chunks() == 1 && data->null_count() == 0) {
+      return ConvertValuesZeroCopy<TYPE>(data->chunk(0));
+    }
 
-    if (arr->null_count() > 0) {
+    if (data->null_count() > 0) {
       RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64));
 
-      // Upcast to double, set NaN as appropriate
-      double* out_values = reinterpret_cast<double*>(PyArray_DATA(out_));
-      for (int i = 0; i < arr->length(); ++i) {
-        out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i];
+      for (int c = 0; c < data->num_chunks(); c++) {
+        const std::shared_ptr<Array> arr = data->chunk(c);
+        auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+        auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+        // Upcast to double, set NaN as appropriate
+        auto out_values = reinterpret_cast<double*>(PyArray_DATA(out_)) + chunk_offset;
+
+        for (int i = 0; i < arr->length(); ++i) {
+          out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i];
+        }
+
+        chunk_offset += arr->length();
       }
     } else {
-      // Zero-Copy. We can pass the data pointer directly to NumPy.
-      void* data = const_cast<T*>(in_values);
-      int type = arrow_traits<TYPE>::npy_type;
-      RETURN_NOT_OK(OutputFromData(type, data));
+      RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
+
+      for (int c = 0; c < data->num_chunks(); c++) {
+        const std::shared_ptr<Array> arr = data->chunk(c);
+        auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+        auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+        auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)) + chunk_offset;
+
+        memcpy(out_values, in_values, sizeof(T) * arr->length());
+
+        chunk_offset += arr->length();
+      }
     }
 
     return Status::OK();
@@ -714,35 +747,48 @@ class ArrowDeserializer {
   template <int T2>
   inline typename std::enable_if<
     arrow_traits<T2>::is_boolean, Status>::type
-  ConvertValues(const std::shared_ptr<Array>& arr) {
+  ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) {
+    size_t chunk_offset = 0;
     PyAcquireGIL lock;
 
-    arrow::BooleanArray* bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
-
-    if (arr->null_count() > 0) {
+    if (data->null_count() > 0) {
       RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
 
-      PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));
-      for (int64_t i = 0; i < arr->length(); ++i) {
-        if (bool_arr->IsNull(i)) {
-          Py_INCREF(Py_None);
-          out_values[i] = Py_None;
-        } else if (bool_arr->Value(i)) {
-          // True
-          Py_INCREF(Py_True);
-          out_values[i] = Py_True;
-        } else {
-          // False
-          Py_INCREF(Py_False);
-          out_values[i] = Py_False;
+      for (int c = 0; c < data->num_chunks(); c++) {
+        const std::shared_ptr<Array> arr = data->chunk(c);
+        auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
+        auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)) + chunk_offset;
+
+        for (int64_t i = 0; i < arr->length(); ++i) {
+          if (bool_arr->IsNull(i)) {
+            Py_INCREF(Py_None);
+            out_values[i] = Py_None;
+          } else if (bool_arr->Value(i)) {
+            // True
+            Py_INCREF(Py_True);
+            out_values[i] = Py_True;
+          } else {
+            // False
+            Py_INCREF(Py_False);
+            out_values[i] = Py_False;
+          }
         }
+
+        chunk_offset += bool_arr->length();
       }
     } else {
       RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
 
-      uint8_t* out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(out_));
-      for (int64_t i = 0; i < arr->length(); ++i) {
-        out_values[i] = static_cast<uint8_t>(bool_arr->Value(i));
+      for (int c = 0; c < data->num_chunks(); c++) {
+        const std::shared_ptr<Array> arr = data->chunk(c);
+        auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
+        auto out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(out_)) + chunk_offset;
+
+        for (int64_t i = 0; i < arr->length(); ++i) {
+          out_values[i] = static_cast<uint8_t>(bool_arr->Value(i));
+        }
+
+        chunk_offset += bool_arr->length();
       }
     }
 
@@ -753,42 +799,49 @@ class ArrowDeserializer {
   template <int T2>
   inline typename std::enable_if<
     T2 == arrow::Type::STRING, Status>::type
-  ConvertValues(const std::shared_ptr<Array>& arr) {
+  ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) {
+    size_t chunk_offset = 0;
     PyAcquireGIL lock;
 
     RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
 
-    PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));
-
-    arrow::StringArray* string_arr = static_cast<arrow::StringArray*>(arr.get());
-
-    const uint8_t* data;
-    int32_t length;
-    if (arr->null_count() > 0) {
-      for (int64_t i = 0; i < arr->length(); ++i) {
-        if (string_arr->IsNull(i)) {
-          Py_INCREF(Py_None);
-          out_values[i] = Py_None;
-        } else {
-          data = string_arr->GetValue(i, &length);
-
-          out_values[i] = make_pystring(data, length);
+    for (int c = 0; c < data->num_chunks(); c++) {
+      const std::shared_ptr<Array> arr = data->chunk(c);
+      auto string_arr = static_cast<arrow::StringArray*>(arr.get());
+      auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)) + chunk_offset;
+
+      const uint8_t* data_ptr;
+      int32_t length;
+      if (data->null_count() > 0) {
+        for (int64_t i = 0; i < arr->length(); ++i) {
+          if (string_arr->IsNull(i)) {
+            Py_INCREF(Py_None);
+            out_values[i] = Py_None;
+          } else {
+            data_ptr = string_arr->GetValue(i, &length);
+
+            out_values[i] = make_pystring(data_ptr, length);
+            if (out_values[i] == nullptr) {
+              return Status::UnknownError("String initialization failed");
+            }
+          }
+        }
+      } else {
+        for (int64_t i = 0; i < arr->length(); ++i) {
+          data_ptr = string_arr->GetValue(i, &length);
+          out_values[i] = make_pystring(data_ptr, length);
           if (out_values[i] == nullptr) {
             return Status::UnknownError("String initialization failed");
           }
         }
       }
-    } else {
-      for (int64_t i = 0; i < arr->length(); ++i) {
-        data = string_arr->GetValue(i, &length);
-        out_values[i] = make_pystring(data, length);
-        if (out_values[i] == nullptr) {
-          return Status::UnknownError("String initialization failed");
-        }
-      }
+
+      chunk_offset += string_arr->length();
     }
+
     return Status::OK();
   }
+
  private:
   std::shared_ptr<Column> col_;
   PyObject* py_ref_;