You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/01 15:19:47 UTC

arrow git commit: ARROW-710: [Python] Read/write with file-like Python objects from read_feather/write_feather

Repository: arrow
Updated Branches:
  refs/heads/master fd000964d -> 31a1f53f4


ARROW-710: [Python] Read/write with file-like Python objects from read_feather/write_feather

cc @jreback

Author: Wes McKinney <we...@twosigma.com>

Closes #474 from wesm/ARROW-710 and squashes the following commits:

61d7218 [Wes McKinney] Do not close OutputStream in Feather writer. Read and write to file-like Python objects


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/31a1f53f
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/31a1f53f
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/31a1f53f

Branch: refs/heads/master
Commit: 31a1f53f4990d07a337ea0b000e04df2917b6d73
Parents: fd00096
Author: Wes McKinney <we...@twosigma.com>
Authored: Sat Apr 1 11:19:40 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Apr 1 11:19:40 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/ipc/feather-test.cc        |   3 +-
 cpp/src/arrow/ipc/feather.cc             |  25 +---
 cpp/src/arrow/ipc/feather.h              |   7 +-
 python/CMakeLists.txt                    |   1 -
 python/pyarrow/_feather.pyx              | 158 --------------------------
 python/pyarrow/feather.py                |  14 +--
 python/pyarrow/includes/libarrow_ipc.pxd |  31 ++++-
 python/pyarrow/io.pyx                    | 101 +++++++++++++++-
 python/pyarrow/tests/test_feather.py     |  17 ++-
 python/setup.py                          |   1 -
 10 files changed, 160 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc
index e181f69..077a44b 100644
--- a/cpp/src/arrow/ipc/feather-test.cc
+++ b/cpp/src/arrow/ipc/feather-test.cc
@@ -272,8 +272,7 @@ class TestTableWriter : public ::testing::Test {
     ASSERT_OK(stream_->Finish(&output_));
 
     std::shared_ptr<io::BufferReader> buffer(new io::BufferReader(output_));
-    reader_.reset(new TableReader());
-    ASSERT_OK(reader_->Open(buffer));
+    ASSERT_OK(TableReader::Open(buffer, &reader_));
   }
 
   void CheckBatch(const RecordBatch& batch) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index 5820563..e838e1f 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -401,16 +401,10 @@ TableReader::TableReader() {
 
 TableReader::~TableReader() {}
 
-Status TableReader::Open(const std::shared_ptr<io::RandomAccessFile>& source) {
-  return impl_->Open(source);
-}
-
-Status TableReader::OpenFile(
-    const std::string& abspath, std::unique_ptr<TableReader>* out) {
-  std::shared_ptr<io::MemoryMappedFile> file;
-  RETURN_NOT_OK(io::MemoryMappedFile::Open(abspath, io::FileMode::READ, &file));
+Status TableReader::Open(const std::shared_ptr<io::RandomAccessFile>& source,
+    std::unique_ptr<TableReader>* out) {
   out->reset(new TableReader());
-  return (*out)->Open(file);
+  return (*out)->impl_->Open(source);
 }
 
 bool TableReader::HasDescription() const {
@@ -517,9 +511,8 @@ class TableWriter::TableWriterImpl : public ArrayVisitor {
     // Footer: metadata length, magic bytes
     RETURN_NOT_OK(
         stream_->Write(reinterpret_cast<const uint8_t*>(&buffer_size), sizeof(uint32_t)));
-    RETURN_NOT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(kFeatherMagicBytes),
-        strlen(kFeatherMagicBytes)));
-    return stream_->Close();
+    return stream_->Write(
+        reinterpret_cast<const uint8_t*>(kFeatherMagicBytes), strlen(kFeatherMagicBytes));
   }
 
   Status LoadArrayMetadata(const Array& values, ArrayMetadata* meta) {
@@ -700,14 +693,6 @@ Status TableWriter::Open(
   return (*out)->impl_->Open(stream);
 }
 
-Status TableWriter::OpenFile(
-    const std::string& abspath, std::unique_ptr<TableWriter>* out) {
-  std::shared_ptr<io::FileOutputStream> file;
-  RETURN_NOT_OK(io::FileOutputStream::Open(abspath, &file));
-  out->reset(new TableWriter());
-  return (*out)->impl_->Open(file);
-}
-
 void TableWriter::SetDescription(const std::string& desc) {
   impl_->SetDescription(desc);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather.h b/cpp/src/arrow/ipc/feather.h
index 1e4ba58..8cc8ca0 100644
--- a/cpp/src/arrow/ipc/feather.h
+++ b/cpp/src/arrow/ipc/feather.h
@@ -54,9 +54,8 @@ class ARROW_EXPORT TableReader {
   TableReader();
   ~TableReader();
 
-  Status Open(const std::shared_ptr<io::RandomAccessFile>& source);
-
-  static Status OpenFile(const std::string& abspath, std::unique_ptr<TableReader>* out);
+  static Status Open(const std::shared_ptr<io::RandomAccessFile>& source,
+      std::unique_ptr<TableReader>* out);
 
   // Optional table description
   //
@@ -86,8 +85,6 @@ class ARROW_EXPORT TableWriter {
   static Status Open(
       const std::shared_ptr<io::OutputStream>& stream, std::unique_ptr<TableWriter>* out);
 
-  static Status OpenFile(const std::string& abspath, std::unique_ptr<TableWriter>* out);
-
   void SetDescription(const std::string& desc);
   void SetNumRows(int64_t num_rows);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 35a1a89..f315d01 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -268,7 +268,6 @@ set(CYTHON_EXTENSIONS
   config
   error
   io
-  _feather
   memory
   scalar
   schema

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/_feather.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_feather.pyx b/python/pyarrow/_feather.pyx
deleted file mode 100644
index beb4aaa..0000000
--- a/python/pyarrow/_feather.pyx
+++ /dev/null
@@ -1,158 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport CArray, CColumn, CSchema, CStatus
-from pyarrow.includes.libarrow_io cimport RandomAccessFile, OutputStream
-
-from libcpp.string cimport string
-from libcpp cimport bool as c_bool
-
-cimport cpython
-
-from pyarrow.compat import frombytes, tobytes, encode_file_path
-
-from pyarrow.array cimport Array
-from pyarrow.error cimport check_status
-from pyarrow.table cimport Column
-
-cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
-
-    cdef cppclass TableWriter:
-        @staticmethod
-        CStatus Open(const shared_ptr[OutputStream]& stream,
-                     unique_ptr[TableWriter]* out)
-
-        @staticmethod
-        CStatus OpenFile(const string& abspath, unique_ptr[TableWriter]* out)
-
-        void SetDescription(const string& desc)
-        void SetNumRows(int64_t num_rows)
-
-        CStatus Append(const string& name, const CArray& values)
-        CStatus Finalize()
-
-    cdef cppclass TableReader:
-        TableReader(const shared_ptr[RandomAccessFile]& source)
-
-        @staticmethod
-        CStatus OpenFile(const string& abspath, unique_ptr[TableReader]* out)
-
-        string GetDescription()
-        c_bool HasDescription()
-
-        int64_t num_rows()
-        int64_t num_columns()
-
-        shared_ptr[CSchema] schema()
-
-        CStatus GetColumn(int i, shared_ptr[CColumn]* out)
-        c_string GetColumnName(int i)
-
-
-class FeatherError(Exception):
-    pass
-
-
-cdef class FeatherWriter:
-    cdef:
-        unique_ptr[TableWriter] writer
-
-    cdef public:
-        int64_t num_rows
-
-    def __cinit__(self):
-        self.num_rows = -1
-
-    def open(self, object dest):
-        cdef:
-            string c_name = encode_file_path(dest)
-
-        check_status(TableWriter.OpenFile(c_name, &self.writer))
-
-    def close(self):
-        if self.num_rows < 0:
-            self.num_rows = 0
-        self.writer.get().SetNumRows(self.num_rows)
-        check_status(self.writer.get().Finalize())
-
-    def write_array(self, object name, object col, object mask=None):
-        cdef Array arr
-
-        if self.num_rows >= 0:
-            if len(col) != self.num_rows:
-                raise ValueError('prior column had a different number of rows')
-        else:
-            self.num_rows = len(col)
-
-        if isinstance(col, Array):
-            arr = col
-        else:
-            arr = Array.from_pandas(col, mask=mask)
-
-        cdef c_string c_name = tobytes(name)
-
-        with nogil:
-            check_status(
-                self.writer.get().Append(c_name, deref(arr.sp_array)))
-
-
-cdef class FeatherReader:
-    cdef:
-        unique_ptr[TableReader] reader
-
-    def __cinit__(self):
-        pass
-
-    def open(self, source):
-        cdef:
-            string c_name = encode_file_path(source)
-
-        check_status(TableReader.OpenFile(c_name, &self.reader))
-
-    property num_rows:
-
-        def __get__(self):
-            return self.reader.get().num_rows()
-
-    property num_columns:
-
-        def __get__(self):
-            return self.reader.get().num_columns()
-
-    def get_column_name(self, int i):
-        cdef c_string name = self.reader.get().GetColumnName(i)
-        return frombytes(name)
-
-    def get_column(self, int i):
-        if i < 0 or i >= self.num_columns:
-            raise IndexError(i)
-
-        cdef shared_ptr[CColumn] sp_column
-        with nogil:
-            check_status(self.reader.get()
-                         .GetColumn(i, &sp_column))
-
-        cdef Column col = Column()
-        col.init(sp_column)
-        return col

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/feather.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index 28424af..f87c7f3 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -20,9 +20,9 @@ from distutils.version import LooseVersion
 import pandas as pd
 
 from pyarrow.compat import pdapi
-from pyarrow._feather import FeatherError  # noqa
+from pyarrow.io import FeatherError  # noqa
 from pyarrow.table import Table
-import pyarrow._feather as ext
+import pyarrow.io as ext
 
 
 if LooseVersion(pd.__version__) < '0.17.0':
@@ -54,12 +54,12 @@ class FeatherReader(ext.FeatherReader):
         return table.to_pandas()
 
 
-def write_feather(df, path):
+def write_feather(df, dest):
     '''
     Write a pandas.DataFrame to Feather format
     '''
     writer = ext.FeatherWriter()
-    writer.open(path)
+    writer.open(dest)
 
     if isinstance(df, pd.SparseDataFrame):
         df = df.to_dense()
@@ -95,13 +95,13 @@ def write_feather(df, path):
     writer.close()
 
 
-def read_feather(path, columns=None):
+def read_feather(source, columns=None):
     """
     Read a pandas.DataFrame from Feather format
 
     Parameters
     ----------
-    path : string, path to read from
+    source : string file path, or file-like object
     columns : sequence, optional
         Only read a specific set of columns. If not provided, all columns are
         read
@@ -110,5 +110,5 @@ def read_feather(path, columns=None):
     -------
     df : pandas.DataFrame
     """
-    reader = FeatherReader(path)
+    reader = FeatherReader(source)
     return reader.read(columns=columns)

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
index 8b7d705..59fd90b 100644
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -18,7 +18,7 @@
 # distutils: language = c++
 
 from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CArray, CSchema, CRecordBatch)
+from pyarrow.includes.libarrow cimport (CArray, CColumn, CSchema, CRecordBatch)
 from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream,
                                            RandomAccessFile)
 
@@ -63,3 +63,32 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
         int num_record_batches()
 
         CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
+
+cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
+
+    cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter":
+        @staticmethod
+        CStatus Open(const shared_ptr[OutputStream]& stream,
+                     unique_ptr[CFeatherWriter]* out)
+
+        void SetDescription(const c_string& desc)
+        void SetNumRows(int64_t num_rows)
+
+        CStatus Append(const c_string& name, const CArray& values)
+        CStatus Finalize()
+
+    cdef cppclass CFeatherReader" arrow::ipc::feather::TableReader":
+        @staticmethod
+        CStatus Open(const shared_ptr[RandomAccessFile]& file,
+                     unique_ptr[CFeatherReader]* out)
+
+        c_string GetDescription()
+        c_bool HasDescription()
+
+        int64_t num_rows()
+        int64_t num_columns()
+
+        shared_ptr[CSchema] schema()
+
+        CStatus GetColumn(int i, shared_ptr[CColumn]* out)
+        c_string GetColumnName(int i)

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index d64427a..0b27379 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -32,10 +32,11 @@ from pyarrow.includes.libarrow_ipc cimport *
 cimport pyarrow.includes.pyarrow as pyarrow
 
 from pyarrow.compat import frombytes, tobytes, encode_file_path
+from pyarrow.array cimport Array
 from pyarrow.error cimport check_status
 from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
 from pyarrow.schema cimport Schema
-from pyarrow.table cimport (RecordBatch, batch_from_cbatch,
+from pyarrow.table cimport (Column, RecordBatch, batch_from_cbatch,
                             table_from_ctable)
 
 cimport cpython as cp
@@ -564,7 +565,9 @@ cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
 cdef get_writer(object source, shared_ptr[OutputStream]* writer):
     cdef NativeFile nf
 
-    if not isinstance(source, NativeFile) and hasattr(source, 'write'):
+    if isinstance(source, six.string_types):
+        source = OSFile(source, mode='w')
+    elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
         # Optimistically hope this is file-like
         source = PythonFileInterface(source, mode='w')
 
@@ -1047,3 +1050,97 @@ cdef class _FileReader:
             check_status(CTable.FromRecordBatches(batches, &table))
 
         return table_from_ctable(table)
+
+
+#----------------------------------------------------------------------
+# Implement legacy Feather file format
+
+
+class FeatherError(Exception):
+    pass
+
+
+cdef class FeatherWriter:
+    cdef:
+        unique_ptr[CFeatherWriter] writer
+
+    cdef public:
+        int64_t num_rows
+
+    def __cinit__(self):
+        self.num_rows = -1
+
+    def open(self, object dest):
+        cdef shared_ptr[OutputStream] sink
+        get_writer(dest, &sink)
+
+        with nogil:
+            check_status(CFeatherWriter.Open(sink, &self.writer))
+
+    def close(self):
+        if self.num_rows < 0:
+            self.num_rows = 0
+        self.writer.get().SetNumRows(self.num_rows)
+        check_status(self.writer.get().Finalize())
+
+    def write_array(self, object name, object col, object mask=None):
+        cdef Array arr
+
+        if self.num_rows >= 0:
+            if len(col) != self.num_rows:
+                raise ValueError('prior column had a different number of rows')
+        else:
+            self.num_rows = len(col)
+
+        if isinstance(col, Array):
+            arr = col
+        else:
+            arr = Array.from_pandas(col, mask=mask)
+
+        cdef c_string c_name = tobytes(name)
+
+        with nogil:
+            check_status(
+                self.writer.get().Append(c_name, deref(arr.sp_array)))
+
+
+cdef class FeatherReader:
+    cdef:
+        unique_ptr[CFeatherReader] reader
+
+    def __cinit__(self):
+        pass
+
+    def open(self, source):
+        cdef shared_ptr[RandomAccessFile] reader
+        get_reader(source, &reader)
+
+        with nogil:
+            check_status(CFeatherReader.Open(reader, &self.reader))
+
+    property num_rows:
+
+        def __get__(self):
+            return self.reader.get().num_rows()
+
+    property num_columns:
+
+        def __get__(self):
+            return self.reader.get().num_columns()
+
+    def get_column_name(self, int i):
+        cdef c_string name = self.reader.get().GetColumnName(i)
+        return frombytes(name)
+
+    def get_column(self, int i):
+        if i < 0 or i >= self.num_columns:
+            raise IndexError(i)
+
+        cdef shared_ptr[CColumn] sp_column
+        with nogil:
+            check_status(self.reader.get()
+                         .GetColumn(i, &sp_column))
+
+        cdef Column col = Column()
+        col.init(sp_column)
+        return col

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/tests/test_feather.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py
index e4b6273..dd6888f 100644
--- a/python/pyarrow/tests/test_feather.py
+++ b/python/pyarrow/tests/test_feather.py
@@ -27,7 +27,7 @@ import pyarrow as pa
 from pyarrow.compat import guid
 from pyarrow.feather import (read_feather, write_feather,
                              FeatherReader)
-from pyarrow._feather import FeatherWriter
+from pyarrow.io import FeatherWriter
 
 
 def random_path():
@@ -347,6 +347,21 @@ class TestFeatherReader(unittest.TestCase):
         df = pd.DataFrame({'ints': values[0: num_values//2]})
         self._check_pandas_roundtrip(df, path=path)
 
+    def test_filelike_objects(self):
+        from io import BytesIO
+
+        buf = BytesIO()
+
+        # the copy makes it non-strided
+        df = pd.DataFrame(np.arange(12).reshape(4, 3),
+                          columns=['a', 'b', 'c']).copy()
+        write_feather(df, buf)
+
+        buf.seek(0)
+
+        result = read_feather(buf)
+        assert_frame_equal(result, df)
+
     def test_sparse_dataframe(self):
         # GH #221
         data = {'A': [0, 1, 2],

http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 9ff0918..12b44e1 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -104,7 +104,6 @@ class build_ext(_build_ext):
         'io',
         'jemalloc',
         'memory',
-        '_feather',
         '_parquet',
         'scalar',
         'schema',