You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/10/12 03:08:56 UTC

arrow git commit: ARROW-332: Add RecordBatch.to_pandas method

Repository: arrow
Updated Branches:
  refs/heads/master caa843bda -> 3919a2778


ARROW-332: Add RecordBatch.to_pandas method

This makes testing and IPC data wrangling a little easier.

Author: Wes McKinney <we...@twosigma.com>

Closes #165 from wesm/ARROW-332 and squashes the following commits:

5f19b97 [Wes McKinney] Add simple arrow::Array->NumPy-for-pandas conversion helper and RecordBatch.to_pandas


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/3919a277
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/3919a277
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/3919a277

Branch: refs/heads/master
Commit: 3919a277884cf504fdca5d730cf128e36db6f700
Parents: caa843b
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Oct 11 23:08:48 2016 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Oct 11 23:08:48 2016 -0400

----------------------------------------------------------------------
 python/pyarrow/includes/pyarrow.pxd   |  7 +++--
 python/pyarrow/io.pyx                 | 12 +++++++++
 python/pyarrow/table.pyx              | 25 ++++++++++++++++--
 python/pyarrow/tests/test_ipc.py      | 40 ++++++++++++++++++++++++++---
 python/pyarrow/tests/test_table.py    | 41 ++++++++++++++++++++++--------
 python/src/pyarrow/adapters/pandas.cc | 19 ++++++++++++--
 python/src/pyarrow/adapters/pandas.h  |  7 ++++-
 python/src/pyarrow/common.h           |  4 +--
 python/src/pyarrow/io.cc              |  2 +-
 9 files changed, 133 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/includes/pyarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd
index 2fa5a7d..7c47f21 100644
--- a/python/pyarrow/includes/pyarrow.pxd
+++ b/python/pyarrow/includes/pyarrow.pxd
@@ -50,8 +50,11 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
     PyStatus PandasMaskedToArrow(MemoryPool* pool, object ao, object mo,
                                  shared_ptr[CArray]* out)
 
-    PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref,
-                           PyObject** out)
+    PyStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr,
+                                  object py_ref, PyObject** out)
+
+    PyStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr,
+                                   object py_ref, PyObject** out)
 
     MemoryPool* get_memory_pool()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 00a492f..8970e06 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -230,6 +230,18 @@ cdef class InMemoryOutputStream(NativeFile):
         return result
 
 
+cdef class BufferReader(NativeFile):
+    cdef:
+        Buffer buffer
+
+    def __cinit__(self, Buffer buffer):
+        self.buffer = buffer
+        self.rd_file.reset(new CBufferReader(buffer.buffer.get().data(),
+                                             buffer.buffer.get().size()))
+        self.is_readonly = 1
+        self.is_open = True
+
+
 def buffer_from_bytes(object obj):
     """
     Construct an Arrow buffer from a Python bytes object

http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index a1cadcd..9695712 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -100,7 +100,7 @@ cdef class Column:
 
         import pandas as pd
 
-        check_status(pyarrow.ArrowToPandas(self.sp_column, self, &arr))
+        check_status(pyarrow.ConvertColumnToPandas(self.sp_column, self, &arr))
         return pd.Series(<object>arr, name=self.name)
 
     cdef _check_nullptr(self):
@@ -233,6 +233,27 @@ cdef class RecordBatch:
 
         return self.batch.Equals(deref(other.batch))
 
+    def to_pandas(self):
+        """
+        Convert the arrow::RecordBatch to a pandas DataFrame
+        """
+        cdef:
+            PyObject* np_arr
+            shared_ptr[CArray] arr
+            Column column
+
+        import pandas as pd
+
+        names = []
+        data = []
+        for i in range(self.batch.num_columns()):
+            arr = self.batch.column(i)
+            check_status(pyarrow.ConvertArrayToPandas(arr, self, &np_arr))
+            names.append(frombytes(self.batch.column_name(i)))
+            data.append(<object> np_arr)
+
+        return pd.DataFrame(dict(zip(names, data)), columns=names)
+
     @classmethod
     def from_pandas(cls, df):
         """
@@ -354,7 +375,7 @@ cdef class Table:
         for i in range(self.table.num_columns()):
             col = self.table.column(i)
             column = self.column(i)
-            check_status(pyarrow.ArrowToPandas(col, column, &arr))
+            check_status(pyarrow.ConvertColumnToPandas(col, column, &arr))
             names.append(frombytes(col.get().name()))
             data.append(<object> arr)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index b9e9e6e..14cbb30 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -18,6 +18,8 @@
 import io
 
 import numpy as np
+
+from pandas.util.testing import assert_frame_equal
 import pandas as pd
 
 import pyarrow as A
@@ -85,17 +87,40 @@ def test_ipc_file_simple_roundtrip():
     helper.run()
 
 
+def test_ipc_zero_copy_numpy():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = A.RecordBatch.from_pandas(df)
+    sink = arrow_io.InMemoryOutputStream()
+    write_file(batch, sink)
+    buffer = sink.get_result()
+    reader = arrow_io.BufferReader(buffer)
+
+    batches = read_file(reader)
+
+    data = batches[0].to_pandas()
+    rdf = pd.DataFrame(data)
+    assert_frame_equal(df, rdf)
+
+
 # XXX: For benchmarking
 
 def big_batch():
+    K = 2**4
+    N = 2**20
     df = pd.DataFrame(
-        np.random.randn(2**4, 2**20).T,
-        columns=[str(i) for i in range(2**4)]
+        np.random.randn(K, N).T,
+        columns=[str(i) for i in range(K)]
     )
 
     df = pd.concat([df] * 2 ** 3, ignore_index=True)
+    return df
+
 
-    return A.RecordBatch.from_pandas(df)
+def write_to_memory2(batch):
+    sink = arrow_io.InMemoryOutputStream()
+    write_file(batch, sink)
+    return sink.get_result()
 
 
 def write_to_memory(batch):
@@ -114,3 +139,12 @@ def read_file(source):
     reader = ipc.ArrowFileReader(source)
     return [reader.get_record_batch(i)
             for i in range(reader.num_record_batches)]
+
+# df = big_batch()
+# batch = A.RecordBatch.from_pandas(df)
+# mem = write_to_memory(batch)
+# batches = read_file(mem)
+# data = batches[0].to_pandas()
+# rdf = pd.DataFrame(data)
+
+# [x.to_pandas() for x in batches]

http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index c513032..4c9d302 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -15,28 +15,47 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import pyarrow as A
+import numpy as np
+
+from pandas.util.testing import assert_frame_equal
+import pandas as pd
+
+import pyarrow as pa
 
 
 def test_recordbatch_basics():
     data = [
-        A.from_pylist(range(5)),
-        A.from_pylist([-10, -5, 0, 5, 10])
+        pa.from_pylist(range(5)),
+        pa.from_pylist([-10, -5, 0, 5, 10])
     ]
 
-    batch = A.RecordBatch.from_arrays(['c0', 'c1'], data)
+    batch = pa.RecordBatch.from_arrays(['c0', 'c1'], data)
 
     assert len(batch) == 5
     assert batch.num_rows == 5
     assert batch.num_columns == len(data)
 
 
+def test_recordbatch_from_to_pandas():
+    data = pd.DataFrame({
+        'c1': np.array([1, 2, 3, 4, 5], dtype='int64'),
+        'c2': np.array([1, 2, 3, 4, 5], dtype='uint32'),
+        'c2': np.random.randn(5),
+        'c3': ['foo', 'bar', None, 'baz', 'qux'],
+        'c4': [False, True, False, True, False]
+    })
+
+    batch = pa.RecordBatch.from_pandas(data)
+    result = batch.to_pandas()
+    assert_frame_equal(data, result)
+
+
 def test_table_basics():
     data = [
-        A.from_pylist(range(5)),
-        A.from_pylist([-10, -5, 0, 5, 10])
+        pa.from_pylist(range(5)),
+        pa.from_pylist([-10, -5, 0, 5, 10])
     ]
-    table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
+    table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
     assert table.name == 'table_name'
     assert len(table) == 5
     assert table.num_rows == 5
@@ -50,15 +69,15 @@ def test_table_basics():
 
 def test_table_pandas():
     data = [
-        A.from_pylist(range(5)),
-        A.from_pylist([-10, -5, 0, 5, 10])
+        pa.from_pylist(range(5)),
+        pa.from_pylist([-10, -5, 0, 5, 10])
     ]
-    table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
+    table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
 
     # TODO: Use this part once from_pandas is implemented
     # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
     # df = pd.DataFrame(data)
-    # A.Table.from_pandas(df)
+    # pa.Table.from_pandas(df)
 
     df = table.to_pandas()
     assert set(df.columns) == set(('a', 'b'))

http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index ae24b7e..b2fcd37 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -21,6 +21,8 @@
 
 #include "pyarrow/numpy_interop.h"
 
+#include "pyarrow/adapters/pandas.h"
+
 #include <cmath>
 #include <cstdint>
 #include <memory>
@@ -38,6 +40,7 @@ namespace pyarrow {
 
 using arrow::Array;
 using arrow::Column;
+using arrow::Field;
 using arrow::DataType;
 namespace util = arrow::util;
 
@@ -106,7 +109,7 @@ struct npy_traits<NPY_FLOAT64> {
 
 template <>
 struct npy_traits<NPY_DATETIME> {
-  typedef double value_type;
+  typedef int64_t value_type;
   using TypeClass = arrow::TimestampType;
 
   static constexpr bool supports_nulls = true;
@@ -163,6 +166,8 @@ class ArrowSerializer {
   Status ConvertData();
 
   Status ConvertObjectStrings(std::shared_ptr<Array>* out) {
+    PyAcquireGIL lock;
+
     PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
     arrow::TypePtr string_type(new arrow::StringType());
     arrow::StringBuilder string_builder(pool_, string_type);
@@ -197,6 +202,8 @@ class ArrowSerializer {
   }
 
   Status ConvertBooleans(std::shared_ptr<Array>* out) {
+    PyAcquireGIL lock;
+
     PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
 
     int nbytes = util::bytes_for_bits(length_);
@@ -798,7 +805,15 @@ class ArrowDeserializer {
     }                                                               \
     break;
 
-Status ArrowToPandas(const std::shared_ptr<Column>& col, PyObject* py_ref,
+Status ConvertArrayToPandas(const std::shared_ptr<Array>& arr, PyObject* py_ref,
+        PyObject** out) {
+  static std::string dummy_name = "dummy";
+  auto field = std::make_shared<Field>(dummy_name, arr->type());
+  auto col = std::make_shared<Column>(field, arr);
+  return ConvertColumnToPandas(col, py_ref, out);
+}
+
+Status ConvertColumnToPandas(const std::shared_ptr<Column>& col, PyObject* py_ref,
         PyObject** out) {
   switch(col->type()->type) {
     FROM_ARROW_CASE(BOOL);

http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/src/pyarrow/adapters/pandas.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.h b/python/src/pyarrow/adapters/pandas.h
index c337768..141d121 100644
--- a/python/src/pyarrow/adapters/pandas.h
+++ b/python/src/pyarrow/adapters/pandas.h
@@ -31,6 +31,7 @@ namespace arrow {
 
 class Array;
 class Column;
+class MemoryPool;
 
 } // namespace arrow
 
@@ -39,7 +40,11 @@ namespace pyarrow {
 class Status;
 
 PYARROW_EXPORT
-Status ArrowToPandas(const std::shared_ptr<arrow::Column>& col, PyObject* py_ref,
+Status ConvertArrayToPandas(const std::shared_ptr<arrow::Array>& arr, PyObject* py_ref,
+    PyObject** out);
+
+PYARROW_EXPORT
+Status ConvertColumnToPandas(const std::shared_ptr<arrow::Column>& col, PyObject* py_ref,
     PyObject** out);
 
 PYARROW_EXPORT

http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/src/pyarrow/common.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h
index 96eed16..50c2577 100644
--- a/python/src/pyarrow/common.h
+++ b/python/src/pyarrow/common.h
@@ -120,8 +120,8 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
     Py_INCREF(arr);
 
     data_ = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
-    size_ = PyArray_SIZE(arr_);
-    capacity_ = size_ * PyArray_DESCR(arr_)->elsize;
+    size_ = PyArray_SIZE(arr_) * PyArray_DESCR(arr_)->elsize;
+    capacity_ = size_;
   }
 
   virtual ~NumPyBuffer() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/src/pyarrow/io.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc
index 9879b34..7bf32ff 100644
--- a/python/src/pyarrow/io.cc
+++ b/python/src/pyarrow/io.cc
@@ -85,7 +85,7 @@ arrow::Status PythonFile::Write(const uint8_t* data, int64_t nbytes) {
   ARROW_RETURN_NOT_OK(CheckPyError());
 
   PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data);
-  Py_DECREF(py_data);
+  Py_XDECREF(py_data);
   Py_XDECREF(result);
   ARROW_RETURN_NOT_OK(CheckPyError());
   return arrow::Status::OK();