You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/01 15:19:47 UTC
arrow git commit: ARROW-710: [Python] Read/write with file-like
Python objects from read_feather/write_feather
Repository: arrow
Updated Branches:
refs/heads/master fd000964d -> 31a1f53f4
ARROW-710: [Python] Read/write with file-like Python objects from read_feather/write_feather
cc @jreback
Author: Wes McKinney <we...@twosigma.com>
Closes #474 from wesm/ARROW-710 and squashes the following commits:
61d7218 [Wes McKinney] Do not close OutputStream in Feather writer. Read and write to file-like Python objects
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/31a1f53f
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/31a1f53f
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/31a1f53f
Branch: refs/heads/master
Commit: 31a1f53f4990d07a337ea0b000e04df2917b6d73
Parents: fd00096
Author: Wes McKinney <we...@twosigma.com>
Authored: Sat Apr 1 11:19:40 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Apr 1 11:19:40 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/ipc/feather-test.cc | 3 +-
cpp/src/arrow/ipc/feather.cc | 25 +---
cpp/src/arrow/ipc/feather.h | 7 +-
python/CMakeLists.txt | 1 -
python/pyarrow/_feather.pyx | 158 --------------------------
python/pyarrow/feather.py | 14 +--
python/pyarrow/includes/libarrow_ipc.pxd | 31 ++++-
python/pyarrow/io.pyx | 101 +++++++++++++++-
python/pyarrow/tests/test_feather.py | 17 ++-
python/setup.py | 1 -
10 files changed, 160 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc
index e181f69..077a44b 100644
--- a/cpp/src/arrow/ipc/feather-test.cc
+++ b/cpp/src/arrow/ipc/feather-test.cc
@@ -272,8 +272,7 @@ class TestTableWriter : public ::testing::Test {
ASSERT_OK(stream_->Finish(&output_));
std::shared_ptr<io::BufferReader> buffer(new io::BufferReader(output_));
- reader_.reset(new TableReader());
- ASSERT_OK(reader_->Open(buffer));
+ ASSERT_OK(TableReader::Open(buffer, &reader_));
}
void CheckBatch(const RecordBatch& batch) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index 5820563..e838e1f 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -401,16 +401,10 @@ TableReader::TableReader() {
TableReader::~TableReader() {}
-Status TableReader::Open(const std::shared_ptr<io::RandomAccessFile>& source) {
- return impl_->Open(source);
-}
-
-Status TableReader::OpenFile(
- const std::string& abspath, std::unique_ptr<TableReader>* out) {
- std::shared_ptr<io::MemoryMappedFile> file;
- RETURN_NOT_OK(io::MemoryMappedFile::Open(abspath, io::FileMode::READ, &file));
+Status TableReader::Open(const std::shared_ptr<io::RandomAccessFile>& source,
+ std::unique_ptr<TableReader>* out) {
out->reset(new TableReader());
- return (*out)->Open(file);
+ return (*out)->impl_->Open(source);
}
bool TableReader::HasDescription() const {
@@ -517,9 +511,8 @@ class TableWriter::TableWriterImpl : public ArrayVisitor {
// Footer: metadata length, magic bytes
RETURN_NOT_OK(
stream_->Write(reinterpret_cast<const uint8_t*>(&buffer_size), sizeof(uint32_t)));
- RETURN_NOT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(kFeatherMagicBytes),
- strlen(kFeatherMagicBytes)));
- return stream_->Close();
+ return stream_->Write(
+ reinterpret_cast<const uint8_t*>(kFeatherMagicBytes), strlen(kFeatherMagicBytes));
}
Status LoadArrayMetadata(const Array& values, ArrayMetadata* meta) {
@@ -700,14 +693,6 @@ Status TableWriter::Open(
return (*out)->impl_->Open(stream);
}
-Status TableWriter::OpenFile(
- const std::string& abspath, std::unique_ptr<TableWriter>* out) {
- std::shared_ptr<io::FileOutputStream> file;
- RETURN_NOT_OK(io::FileOutputStream::Open(abspath, &file));
- out->reset(new TableWriter());
- return (*out)->impl_->Open(file);
-}
-
void TableWriter::SetDescription(const std::string& desc) {
impl_->SetDescription(desc);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather.h b/cpp/src/arrow/ipc/feather.h
index 1e4ba58..8cc8ca0 100644
--- a/cpp/src/arrow/ipc/feather.h
+++ b/cpp/src/arrow/ipc/feather.h
@@ -54,9 +54,8 @@ class ARROW_EXPORT TableReader {
TableReader();
~TableReader();
- Status Open(const std::shared_ptr<io::RandomAccessFile>& source);
-
- static Status OpenFile(const std::string& abspath, std::unique_ptr<TableReader>* out);
+ static Status Open(const std::shared_ptr<io::RandomAccessFile>& source,
+ std::unique_ptr<TableReader>* out);
// Optional table description
//
@@ -86,8 +85,6 @@ class ARROW_EXPORT TableWriter {
static Status Open(
const std::shared_ptr<io::OutputStream>& stream, std::unique_ptr<TableWriter>* out);
- static Status OpenFile(const std::string& abspath, std::unique_ptr<TableWriter>* out);
-
void SetDescription(const std::string& desc);
void SetNumRows(int64_t num_rows);
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 35a1a89..f315d01 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -268,7 +268,6 @@ set(CYTHON_EXTENSIONS
config
error
io
- _feather
memory
scalar
schema
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/_feather.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_feather.pyx b/python/pyarrow/_feather.pyx
deleted file mode 100644
index beb4aaa..0000000
--- a/python/pyarrow/_feather.pyx
+++ /dev/null
@@ -1,158 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport CArray, CColumn, CSchema, CStatus
-from pyarrow.includes.libarrow_io cimport RandomAccessFile, OutputStream
-
-from libcpp.string cimport string
-from libcpp cimport bool as c_bool
-
-cimport cpython
-
-from pyarrow.compat import frombytes, tobytes, encode_file_path
-
-from pyarrow.array cimport Array
-from pyarrow.error cimport check_status
-from pyarrow.table cimport Column
-
-cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
-
- cdef cppclass TableWriter:
- @staticmethod
- CStatus Open(const shared_ptr[OutputStream]& stream,
- unique_ptr[TableWriter]* out)
-
- @staticmethod
- CStatus OpenFile(const string& abspath, unique_ptr[TableWriter]* out)
-
- void SetDescription(const string& desc)
- void SetNumRows(int64_t num_rows)
-
- CStatus Append(const string& name, const CArray& values)
- CStatus Finalize()
-
- cdef cppclass TableReader:
- TableReader(const shared_ptr[RandomAccessFile]& source)
-
- @staticmethod
- CStatus OpenFile(const string& abspath, unique_ptr[TableReader]* out)
-
- string GetDescription()
- c_bool HasDescription()
-
- int64_t num_rows()
- int64_t num_columns()
-
- shared_ptr[CSchema] schema()
-
- CStatus GetColumn(int i, shared_ptr[CColumn]* out)
- c_string GetColumnName(int i)
-
-
-class FeatherError(Exception):
- pass
-
-
-cdef class FeatherWriter:
- cdef:
- unique_ptr[TableWriter] writer
-
- cdef public:
- int64_t num_rows
-
- def __cinit__(self):
- self.num_rows = -1
-
- def open(self, object dest):
- cdef:
- string c_name = encode_file_path(dest)
-
- check_status(TableWriter.OpenFile(c_name, &self.writer))
-
- def close(self):
- if self.num_rows < 0:
- self.num_rows = 0
- self.writer.get().SetNumRows(self.num_rows)
- check_status(self.writer.get().Finalize())
-
- def write_array(self, object name, object col, object mask=None):
- cdef Array arr
-
- if self.num_rows >= 0:
- if len(col) != self.num_rows:
- raise ValueError('prior column had a different number of rows')
- else:
- self.num_rows = len(col)
-
- if isinstance(col, Array):
- arr = col
- else:
- arr = Array.from_pandas(col, mask=mask)
-
- cdef c_string c_name = tobytes(name)
-
- with nogil:
- check_status(
- self.writer.get().Append(c_name, deref(arr.sp_array)))
-
-
-cdef class FeatherReader:
- cdef:
- unique_ptr[TableReader] reader
-
- def __cinit__(self):
- pass
-
- def open(self, source):
- cdef:
- string c_name = encode_file_path(source)
-
- check_status(TableReader.OpenFile(c_name, &self.reader))
-
- property num_rows:
-
- def __get__(self):
- return self.reader.get().num_rows()
-
- property num_columns:
-
- def __get__(self):
- return self.reader.get().num_columns()
-
- def get_column_name(self, int i):
- cdef c_string name = self.reader.get().GetColumnName(i)
- return frombytes(name)
-
- def get_column(self, int i):
- if i < 0 or i >= self.num_columns:
- raise IndexError(i)
-
- cdef shared_ptr[CColumn] sp_column
- with nogil:
- check_status(self.reader.get()
- .GetColumn(i, &sp_column))
-
- cdef Column col = Column()
- col.init(sp_column)
- return col
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/feather.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index 28424af..f87c7f3 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -20,9 +20,9 @@ from distutils.version import LooseVersion
import pandas as pd
from pyarrow.compat import pdapi
-from pyarrow._feather import FeatherError # noqa
+from pyarrow.io import FeatherError # noqa
from pyarrow.table import Table
-import pyarrow._feather as ext
+import pyarrow.io as ext
if LooseVersion(pd.__version__) < '0.17.0':
@@ -54,12 +54,12 @@ class FeatherReader(ext.FeatherReader):
return table.to_pandas()
-def write_feather(df, path):
+def write_feather(df, dest):
'''
Write a pandas.DataFrame to Feather format
'''
writer = ext.FeatherWriter()
- writer.open(path)
+ writer.open(dest)
if isinstance(df, pd.SparseDataFrame):
df = df.to_dense()
@@ -95,13 +95,13 @@ def write_feather(df, path):
writer.close()
-def read_feather(path, columns=None):
+def read_feather(source, columns=None):
"""
Read a pandas.DataFrame from Feather format
Parameters
----------
- path : string, path to read from
+ source : string file path, or file-like object
columns : sequence, optional
Only read a specific set of columns. If not provided, all columns are
read
@@ -110,5 +110,5 @@ def read_feather(path, columns=None):
-------
df : pandas.DataFrame
"""
- reader = FeatherReader(path)
+ reader = FeatherReader(source)
return reader.read(columns=columns)
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
index 8b7d705..59fd90b 100644
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -18,7 +18,7 @@
# distutils: language = c++
from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CArray, CSchema, CRecordBatch)
+from pyarrow.includes.libarrow cimport (CArray, CColumn, CSchema, CRecordBatch)
from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream,
RandomAccessFile)
@@ -63,3 +63,32 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
int num_record_batches()
CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
+
+cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
+
+ cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter":
+ @staticmethod
+ CStatus Open(const shared_ptr[OutputStream]& stream,
+ unique_ptr[CFeatherWriter]* out)
+
+ void SetDescription(const c_string& desc)
+ void SetNumRows(int64_t num_rows)
+
+ CStatus Append(const c_string& name, const CArray& values)
+ CStatus Finalize()
+
+ cdef cppclass CFeatherReader" arrow::ipc::feather::TableReader":
+ @staticmethod
+ CStatus Open(const shared_ptr[RandomAccessFile]& file,
+ unique_ptr[CFeatherReader]* out)
+
+ c_string GetDescription()
+ c_bool HasDescription()
+
+ int64_t num_rows()
+ int64_t num_columns()
+
+ shared_ptr[CSchema] schema()
+
+ CStatus GetColumn(int i, shared_ptr[CColumn]* out)
+ c_string GetColumnName(int i)
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index d64427a..0b27379 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -32,10 +32,11 @@ from pyarrow.includes.libarrow_ipc cimport *
cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.compat import frombytes, tobytes, encode_file_path
+from pyarrow.array cimport Array
from pyarrow.error cimport check_status
from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
from pyarrow.schema cimport Schema
-from pyarrow.table cimport (RecordBatch, batch_from_cbatch,
+from pyarrow.table cimport (Column, RecordBatch, batch_from_cbatch,
table_from_ctable)
cimport cpython as cp
@@ -564,7 +565,9 @@ cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
cdef get_writer(object source, shared_ptr[OutputStream]* writer):
cdef NativeFile nf
- if not isinstance(source, NativeFile) and hasattr(source, 'write'):
+ if isinstance(source, six.string_types):
+ source = OSFile(source, mode='w')
+ elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
# Optimistically hope this is file-like
source = PythonFileInterface(source, mode='w')
@@ -1047,3 +1050,97 @@ cdef class _FileReader:
check_status(CTable.FromRecordBatches(batches, &table))
return table_from_ctable(table)
+
+
+#----------------------------------------------------------------------
+# Implement legacy Feather file format
+
+
+class FeatherError(Exception):
+ pass
+
+
+cdef class FeatherWriter:
+ cdef:
+ unique_ptr[CFeatherWriter] writer
+
+ cdef public:
+ int64_t num_rows
+
+ def __cinit__(self):
+ self.num_rows = -1
+
+ def open(self, object dest):
+ cdef shared_ptr[OutputStream] sink
+ get_writer(dest, &sink)
+
+ with nogil:
+ check_status(CFeatherWriter.Open(sink, &self.writer))
+
+ def close(self):
+ if self.num_rows < 0:
+ self.num_rows = 0
+ self.writer.get().SetNumRows(self.num_rows)
+ check_status(self.writer.get().Finalize())
+
+ def write_array(self, object name, object col, object mask=None):
+ cdef Array arr
+
+ if self.num_rows >= 0:
+ if len(col) != self.num_rows:
+ raise ValueError('prior column had a different number of rows')
+ else:
+ self.num_rows = len(col)
+
+ if isinstance(col, Array):
+ arr = col
+ else:
+ arr = Array.from_pandas(col, mask=mask)
+
+ cdef c_string c_name = tobytes(name)
+
+ with nogil:
+ check_status(
+ self.writer.get().Append(c_name, deref(arr.sp_array)))
+
+
+cdef class FeatherReader:
+ cdef:
+ unique_ptr[CFeatherReader] reader
+
+ def __cinit__(self):
+ pass
+
+ def open(self, source):
+ cdef shared_ptr[RandomAccessFile] reader
+ get_reader(source, &reader)
+
+ with nogil:
+ check_status(CFeatherReader.Open(reader, &self.reader))
+
+ property num_rows:
+
+ def __get__(self):
+ return self.reader.get().num_rows()
+
+ property num_columns:
+
+ def __get__(self):
+ return self.reader.get().num_columns()
+
+ def get_column_name(self, int i):
+ cdef c_string name = self.reader.get().GetColumnName(i)
+ return frombytes(name)
+
+ def get_column(self, int i):
+ if i < 0 or i >= self.num_columns:
+ raise IndexError(i)
+
+ cdef shared_ptr[CColumn] sp_column
+ with nogil:
+ check_status(self.reader.get()
+ .GetColumn(i, &sp_column))
+
+ cdef Column col = Column()
+ col.init(sp_column)
+ return col
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/tests/test_feather.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py
index e4b6273..dd6888f 100644
--- a/python/pyarrow/tests/test_feather.py
+++ b/python/pyarrow/tests/test_feather.py
@@ -27,7 +27,7 @@ import pyarrow as pa
from pyarrow.compat import guid
from pyarrow.feather import (read_feather, write_feather,
FeatherReader)
-from pyarrow._feather import FeatherWriter
+from pyarrow.io import FeatherWriter
def random_path():
@@ -347,6 +347,21 @@ class TestFeatherReader(unittest.TestCase):
df = pd.DataFrame({'ints': values[0: num_values//2]})
self._check_pandas_roundtrip(df, path=path)
+ def test_filelike_objects(self):
+ from io import BytesIO
+
+ buf = BytesIO()
+
+ # the copy makes it non-strided
+ df = pd.DataFrame(np.arange(12).reshape(4, 3),
+ columns=['a', 'b', 'c']).copy()
+ write_feather(df, buf)
+
+ buf.seek(0)
+
+ result = read_feather(buf)
+ assert_frame_equal(result, df)
+
def test_sparse_dataframe(self):
# GH #221
data = {'A': [0, 1, 2],
http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 9ff0918..12b44e1 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -104,7 +104,6 @@ class build_ext(_build_ext):
'io',
'jemalloc',
'memory',
- '_feather',
'_parquet',
'scalar',
'schema',