You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/10/12 03:08:56 UTC
arrow git commit: ARROW-332: Add RecordBatch.to_pandas method
Repository: arrow
Updated Branches:
refs/heads/master caa843bda -> 3919a2778
ARROW-332: Add RecordBatch.to_pandas method
This makes testing and IPC data wrangling a little easier.
Author: Wes McKinney <we...@twosigma.com>
Closes #165 from wesm/ARROW-332 and squashes the following commits:
5f19b97 [Wes McKinney] Add simple arrow::Array->NumPy-for-pandas conversion helper and RecordBatch.to_pandas
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/3919a277
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/3919a277
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/3919a277
Branch: refs/heads/master
Commit: 3919a277884cf504fdca5d730cf128e36db6f700
Parents: caa843b
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Oct 11 23:08:48 2016 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Oct 11 23:08:48 2016 -0400
----------------------------------------------------------------------
python/pyarrow/includes/pyarrow.pxd | 7 +++--
python/pyarrow/io.pyx | 12 +++++++++
python/pyarrow/table.pyx | 25 ++++++++++++++++--
python/pyarrow/tests/test_ipc.py | 40 ++++++++++++++++++++++++++---
python/pyarrow/tests/test_table.py | 41 ++++++++++++++++++++++--------
python/src/pyarrow/adapters/pandas.cc | 19 ++++++++++++--
python/src/pyarrow/adapters/pandas.h | 7 ++++-
python/src/pyarrow/common.h | 4 +--
python/src/pyarrow/io.cc | 2 +-
9 files changed, 133 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/includes/pyarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd
index 2fa5a7d..7c47f21 100644
--- a/python/pyarrow/includes/pyarrow.pxd
+++ b/python/pyarrow/includes/pyarrow.pxd
@@ -50,8 +50,11 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
PyStatus PandasMaskedToArrow(MemoryPool* pool, object ao, object mo,
shared_ptr[CArray]* out)
- PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref,
- PyObject** out)
+ PyStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr,
+ object py_ref, PyObject** out)
+
+ PyStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr,
+ object py_ref, PyObject** out)
MemoryPool* get_memory_pool()
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 00a492f..8970e06 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -230,6 +230,18 @@ cdef class InMemoryOutputStream(NativeFile):
return result
+cdef class BufferReader(NativeFile):
+ cdef:
+ Buffer buffer
+
+ def __cinit__(self, Buffer buffer):
+ self.buffer = buffer
+ self.rd_file.reset(new CBufferReader(buffer.buffer.get().data(),
+ buffer.buffer.get().size()))
+ self.is_readonly = 1
+ self.is_open = True
+
+
def buffer_from_bytes(object obj):
"""
Construct an Arrow buffer from a Python bytes object
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index a1cadcd..9695712 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -100,7 +100,7 @@ cdef class Column:
import pandas as pd
- check_status(pyarrow.ArrowToPandas(self.sp_column, self, &arr))
+ check_status(pyarrow.ConvertColumnToPandas(self.sp_column, self, &arr))
return pd.Series(<object>arr, name=self.name)
cdef _check_nullptr(self):
@@ -233,6 +233,27 @@ cdef class RecordBatch:
return self.batch.Equals(deref(other.batch))
+ def to_pandas(self):
+ """
+ Convert the arrow::RecordBatch to a pandas DataFrame
+ """
+ cdef:
+ PyObject* np_arr
+ shared_ptr[CArray] arr
+ Column column
+
+ import pandas as pd
+
+ names = []
+ data = []
+ for i in range(self.batch.num_columns()):
+ arr = self.batch.column(i)
+ check_status(pyarrow.ConvertArrayToPandas(arr, self, &np_arr))
+ names.append(frombytes(self.batch.column_name(i)))
+ data.append(<object> np_arr)
+
+ return pd.DataFrame(dict(zip(names, data)), columns=names)
+
@classmethod
def from_pandas(cls, df):
"""
@@ -354,7 +375,7 @@ cdef class Table:
for i in range(self.table.num_columns()):
col = self.table.column(i)
column = self.column(i)
- check_status(pyarrow.ArrowToPandas(col, column, &arr))
+ check_status(pyarrow.ConvertColumnToPandas(col, column, &arr))
names.append(frombytes(col.get().name()))
data.append(<object> arr)
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index b9e9e6e..14cbb30 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -18,6 +18,8 @@
import io
import numpy as np
+
+from pandas.util.testing import assert_frame_equal
import pandas as pd
import pyarrow as A
@@ -85,17 +87,40 @@ def test_ipc_file_simple_roundtrip():
helper.run()
+def test_ipc_zero_copy_numpy():
+ df = pd.DataFrame({'foo': [1.5]})
+
+ batch = A.RecordBatch.from_pandas(df)
+ sink = arrow_io.InMemoryOutputStream()
+ write_file(batch, sink)
+ buffer = sink.get_result()
+ reader = arrow_io.BufferReader(buffer)
+
+ batches = read_file(reader)
+
+ data = batches[0].to_pandas()
+ rdf = pd.DataFrame(data)
+ assert_frame_equal(df, rdf)
+
+
# XXX: For benchmarking
def big_batch():
+ K = 2**4
+ N = 2**20
df = pd.DataFrame(
- np.random.randn(2**4, 2**20).T,
- columns=[str(i) for i in range(2**4)]
+ np.random.randn(K, N).T,
+ columns=[str(i) for i in range(K)]
)
df = pd.concat([df] * 2 ** 3, ignore_index=True)
+ return df
+
- return A.RecordBatch.from_pandas(df)
+def write_to_memory2(batch):
+ sink = arrow_io.InMemoryOutputStream()
+ write_file(batch, sink)
+ return sink.get_result()
def write_to_memory(batch):
@@ -114,3 +139,12 @@ def read_file(source):
reader = ipc.ArrowFileReader(source)
return [reader.get_record_batch(i)
for i in range(reader.num_record_batches)]
+
+# df = big_batch()
+# batch = A.RecordBatch.from_pandas(df)
+# mem = write_to_memory(batch)
+# batches = read_file(mem)
+# data = batches[0].to_pandas()
+# rdf = pd.DataFrame(data)
+
+# [x.to_pandas() for x in batches]
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index c513032..4c9d302 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -15,28 +15,47 @@
# specific language governing permissions and limitations
# under the License.
-import pyarrow as A
+import numpy as np
+
+from pandas.util.testing import assert_frame_equal
+import pandas as pd
+
+import pyarrow as pa
def test_recordbatch_basics():
data = [
- A.from_pylist(range(5)),
- A.from_pylist([-10, -5, 0, 5, 10])
+ pa.from_pylist(range(5)),
+ pa.from_pylist([-10, -5, 0, 5, 10])
]
- batch = A.RecordBatch.from_arrays(['c0', 'c1'], data)
+ batch = pa.RecordBatch.from_arrays(['c0', 'c1'], data)
assert len(batch) == 5
assert batch.num_rows == 5
assert batch.num_columns == len(data)
+def test_recordbatch_from_to_pandas():
+ data = pd.DataFrame({
+ 'c1': np.array([1, 2, 3, 4, 5], dtype='int64'),
+ 'c2': np.array([1, 2, 3, 4, 5], dtype='uint32'),
+ 'c2': np.random.randn(5),
+ 'c3': ['foo', 'bar', None, 'baz', 'qux'],
+ 'c4': [False, True, False, True, False]
+ })
+
+ batch = pa.RecordBatch.from_pandas(data)
+ result = batch.to_pandas()
+ assert_frame_equal(data, result)
+
+
def test_table_basics():
data = [
- A.from_pylist(range(5)),
- A.from_pylist([-10, -5, 0, 5, 10])
+ pa.from_pylist(range(5)),
+ pa.from_pylist([-10, -5, 0, 5, 10])
]
- table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
+ table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
assert table.name == 'table_name'
assert len(table) == 5
assert table.num_rows == 5
@@ -50,15 +69,15 @@ def test_table_basics():
def test_table_pandas():
data = [
- A.from_pylist(range(5)),
- A.from_pylist([-10, -5, 0, 5, 10])
+ pa.from_pylist(range(5)),
+ pa.from_pylist([-10, -5, 0, 5, 10])
]
- table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
+ table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
# TODO: Use this part once from_pandas is implemented
# data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
# df = pd.DataFrame(data)
- # A.Table.from_pandas(df)
+ # pa.Table.from_pandas(df)
df = table.to_pandas()
assert set(df.columns) == set(('a', 'b'))
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index ae24b7e..b2fcd37 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -21,6 +21,8 @@
#include "pyarrow/numpy_interop.h"
+#include "pyarrow/adapters/pandas.h"
+
#include <cmath>
#include <cstdint>
#include <memory>
@@ -38,6 +40,7 @@ namespace pyarrow {
using arrow::Array;
using arrow::Column;
+using arrow::Field;
using arrow::DataType;
namespace util = arrow::util;
@@ -106,7 +109,7 @@ struct npy_traits<NPY_FLOAT64> {
template <>
struct npy_traits<NPY_DATETIME> {
- typedef double value_type;
+ typedef int64_t value_type;
using TypeClass = arrow::TimestampType;
static constexpr bool supports_nulls = true;
@@ -163,6 +166,8 @@ class ArrowSerializer {
Status ConvertData();
Status ConvertObjectStrings(std::shared_ptr<Array>* out) {
+ PyAcquireGIL lock;
+
PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
arrow::TypePtr string_type(new arrow::StringType());
arrow::StringBuilder string_builder(pool_, string_type);
@@ -197,6 +202,8 @@ class ArrowSerializer {
}
Status ConvertBooleans(std::shared_ptr<Array>* out) {
+ PyAcquireGIL lock;
+
PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
int nbytes = util::bytes_for_bits(length_);
@@ -798,7 +805,15 @@ class ArrowDeserializer {
} \
break;
-Status ArrowToPandas(const std::shared_ptr<Column>& col, PyObject* py_ref,
+Status ConvertArrayToPandas(const std::shared_ptr<Array>& arr, PyObject* py_ref,
+ PyObject** out) {
+ static std::string dummy_name = "dummy";
+ auto field = std::make_shared<Field>(dummy_name, arr->type());
+ auto col = std::make_shared<Column>(field, arr);
+ return ConvertColumnToPandas(col, py_ref, out);
+}
+
+Status ConvertColumnToPandas(const std::shared_ptr<Column>& col, PyObject* py_ref,
PyObject** out) {
switch(col->type()->type) {
FROM_ARROW_CASE(BOOL);
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/src/pyarrow/adapters/pandas.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.h b/python/src/pyarrow/adapters/pandas.h
index c337768..141d121 100644
--- a/python/src/pyarrow/adapters/pandas.h
+++ b/python/src/pyarrow/adapters/pandas.h
@@ -31,6 +31,7 @@ namespace arrow {
class Array;
class Column;
+class MemoryPool;
} // namespace arrow
@@ -39,7 +40,11 @@ namespace pyarrow {
class Status;
PYARROW_EXPORT
-Status ArrowToPandas(const std::shared_ptr<arrow::Column>& col, PyObject* py_ref,
+Status ConvertArrayToPandas(const std::shared_ptr<arrow::Array>& arr, PyObject* py_ref,
+ PyObject** out);
+
+PYARROW_EXPORT
+Status ConvertColumnToPandas(const std::shared_ptr<arrow::Column>& col, PyObject* py_ref,
PyObject** out);
PYARROW_EXPORT
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/src/pyarrow/common.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h
index 96eed16..50c2577 100644
--- a/python/src/pyarrow/common.h
+++ b/python/src/pyarrow/common.h
@@ -120,8 +120,8 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
Py_INCREF(arr);
data_ = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
- size_ = PyArray_SIZE(arr_);
- capacity_ = size_ * PyArray_DESCR(arr_)->elsize;
+ size_ = PyArray_SIZE(arr_) * PyArray_DESCR(arr_)->elsize;
+ capacity_ = size_;
}
virtual ~NumPyBuffer() {
http://git-wip-us.apache.org/repos/asf/arrow/blob/3919a277/python/src/pyarrow/io.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc
index 9879b34..7bf32ff 100644
--- a/python/src/pyarrow/io.cc
+++ b/python/src/pyarrow/io.cc
@@ -85,7 +85,7 @@ arrow::Status PythonFile::Write(const uint8_t* data, int64_t nbytes) {
ARROW_RETURN_NOT_OK(CheckPyError());
PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data);
- Py_DECREF(py_data);
+ Py_XDECREF(py_data);
Py_XDECREF(result);
ARROW_RETURN_NOT_OK(CheckPyError());
return arrow::Status::OK();