You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/16 17:14:26 UTC
arrow git commit: ARROW-881: [Python] Reconstruct Pandas DataFrame
indexes using metadata
Repository: arrow
Updated Branches:
refs/heads/master e7e8d611c -> bed019743
ARROW-881: [Python] Reconstruct Pandas DataFrame indexes using metadata
cc @mrocklin
Author: Phillip Cloud <cp...@gmail.com>
Closes #612 from cpcloud/ARROW-881 and squashes the following commits:
4fa679d [Phillip Cloud] Add metadata test
60f71aa [Phillip Cloud] More doc
de616e8 [Phillip Cloud] Add doc
a42a084 [Phillip Cloud] Decode metadata to utf8 because JSON
2198dc5 [Phillip Cloud] Call column_name_idx on index_columns
32c5e64 [Phillip Cloud] Add test for read_pandas subset
2fa1f16 [Phillip Cloud] Do not write index_column metadata if not requested
21a8829 [Phillip Cloud] Add docs to pq.read_pandas
c35970c [Phillip Cloud] Add test for no index written and pq.read_pandas
59477b5 [Phillip Cloud] ARROW-881: [Python] Reconstruct Pandas DataFrame indexes using custom_metadata
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/bed01974
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/bed01974
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/bed01974
Branch: refs/heads/master
Commit: bed01974321d9d1edeae9e474bd9df020b42ea10
Parents: e7e8d61
Author: Phillip Cloud <cp...@gmail.com>
Authored: Tue May 16 13:14:18 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue May 16 13:14:18 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/type-test.cc | 16 ++
cpp/src/arrow/type.cc | 11 +-
cpp/src/arrow/type.h | 7 +-
python/pyarrow/__init__.py | 3 +-
python/pyarrow/_parquet.pxd | 5 +
python/pyarrow/_parquet.pyx | 61 +++---
python/pyarrow/array.pxi | 92 +++++----
python/pyarrow/includes/common.pxd | 2 +-
python/pyarrow/includes/libarrow.pxd | 6 +-
python/pyarrow/io.pxi | 2 +-
python/pyarrow/ipc.py | 40 ++++
python/pyarrow/lib.pxd | 32 ++--
python/pyarrow/memory.pxi | 2 +-
python/pyarrow/pandas_compat.py | 104 +++++++++++
python/pyarrow/parquet.py | 63 ++++++-
python/pyarrow/table.pxi | 225 ++++++++++++++++-------
python/pyarrow/tests/pandas_examples.py | 8 +-
python/pyarrow/tests/test_convert_pandas.py | 45 +++--
python/pyarrow/tests/test_ipc.py | 51 +++++
python/pyarrow/tests/test_parquet.py | 108 +++++++++--
python/pyarrow/tests/test_table.py | 2 +-
21 files changed, 697 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/cpp/src/arrow/type-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc
index e73adec..1fbb683 100644
--- a/cpp/src/arrow/type-test.cc
+++ b/cpp/src/arrow/type-test.cc
@@ -152,6 +152,22 @@ TEST_F(TestSchema, GetFieldByName) {
ASSERT_TRUE(result == nullptr);
}
+TEST_F(TestSchema, GetFieldIndex) {
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", uint8(), false);
+ auto f2 = field("f2", utf8());
+ auto f3 = field("f3", list(int16()));
+
+ vector<shared_ptr<Field>> fields = {f0, f1, f2, f3};
+ auto schema = std::make_shared<Schema>(fields);
+
+ ASSERT_EQ(0, schema->GetFieldIndex(fields[0]->name()));
+ ASSERT_EQ(1, schema->GetFieldIndex(fields[1]->name()));
+ ASSERT_EQ(2, schema->GetFieldIndex(fields[2]->name()));
+ ASSERT_EQ(3, schema->GetFieldIndex(fields[3]->name()));
+ ASSERT_EQ(-1, schema->GetFieldIndex("not-found"));
+}
+
TEST_F(TestSchema, TestMetadataConstruction) {
auto f0 = field("f0", int32());
auto f1 = field("f1", uint8(), false);
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index afb3027..891045e 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -265,7 +265,12 @@ bool Schema::Equals(const Schema& other) const {
return true;
}
-std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) {
+std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) const {
+ int64_t i = GetFieldIndex(name);
+ return i == -1 ? nullptr : fields_[i];
+}
+
+int64_t Schema::GetFieldIndex(const std::string& name) const {
if (fields_.size() > 0 && name_to_index_.size() == 0) {
for (size_t i = 0; i < fields_.size(); ++i) {
name_to_index_[fields_[i]->name()] = static_cast<int>(i);
@@ -274,9 +279,9 @@ std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) {
auto it = name_to_index_.find(name);
if (it == name_to_index_.end()) {
- return nullptr;
+ return -1;
} else {
- return fields_[it->second];
+ return it->second;
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 40615f7..3e85291 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -699,7 +699,10 @@ class ARROW_EXPORT Schema {
std::shared_ptr<Field> field(int i) const { return fields_[i]; }
// Returns nullptr if name not found
- std::shared_ptr<Field> GetFieldByName(const std::string& name);
+ std::shared_ptr<Field> GetFieldByName(const std::string& name) const;
+
+ // Returns -1 if name not found
+ int64_t GetFieldIndex(const std::string& name) const;
const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; }
std::shared_ptr<const KeyValueMetadata> metadata() const { return metadata_; }
@@ -720,7 +723,7 @@ class ARROW_EXPORT Schema {
private:
std::vector<std::shared_ptr<Field>> fields_;
- std::unordered_map<std::string, int> name_to_index_;
+ mutable std::unordered_map<std::string, int> name_to_index_;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 632a443..0f34121 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -104,7 +104,8 @@ from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter,
RecordBatchStreamReader, RecordBatchStreamWriter,
open_stream,
- open_file)
+ open_file,
+ serialize_pandas, deserialize_pandas)
localfs = LocalFilesystem.get_instance()
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 9f6edc0..2f6b9a9 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -20,6 +20,7 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus,
CTable, CMemoryPool,
+ CKeyValueMetadata,
RandomAccessFile, OutputStream)
@@ -164,6 +165,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
unique_ptr[CRowGroupMetaData] RowGroup(int i)
const SchemaDescriptor* schema()
+ shared_ptr[const CKeyValueMetadata] key_value_metadata() const
cdef cppclass ReaderProperties:
pass
@@ -229,8 +231,11 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema,
+ const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
shared_ptr[CSchema]* out)
+
CStatus ToParquetSchema(const CSchema* arrow_schema,
+ const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
shared_ptr[SchemaDescriptor]* out)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 51bd938..77ef7ad 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -40,15 +40,15 @@ cdef class RowGroupMetaData:
cdef:
unique_ptr[CRowGroupMetaData] up_metadata
CRowGroupMetaData* metadata
- object parent
+ FileMetaData parent
def __cinit__(self):
pass
- cdef init_from_file(self, FileMetaData parent, int i):
+ cdef void init_from_file(self, FileMetaData parent, int i):
if i < 0 or i >= parent.num_row_groups:
raise IndexError('{0} out of bounds'.format(i))
- self.up_metadata = parent.metadata.RowGroup(i)
+ self.up_metadata = parent._metadata.RowGroup(i)
self.metadata = self.up_metadata.get()
self.parent = parent
@@ -80,15 +80,15 @@ cdef class RowGroupMetaData:
cdef class FileMetaData:
cdef:
shared_ptr[CFileMetaData] sp_metadata
- CFileMetaData* metadata
- object _schema
+ CFileMetaData* _metadata
+ ParquetSchema _schema
def __cinit__(self):
pass
cdef init(self, const shared_ptr[CFileMetaData]& metadata):
self.sp_metadata = metadata
- self.metadata = metadata.get()
+ self._metadata = metadata.get()
def __repr__(self):
return """{0}
@@ -116,27 +116,27 @@ cdef class FileMetaData:
property serialized_size:
def __get__(self):
- return self.metadata.size()
+ return self._metadata.size()
property num_columns:
def __get__(self):
- return self.metadata.num_columns()
+ return self._metadata.num_columns()
property num_rows:
def __get__(self):
- return self.metadata.num_rows()
+ return self._metadata.num_rows()
property num_row_groups:
def __get__(self):
- return self.metadata.num_row_groups()
+ return self._metadata.num_row_groups()
property format_version:
def __get__(self):
- cdef ParquetVersion version = self.metadata.version()
+ cdef ParquetVersion version = self._metadata.version()
if version == ParquetVersion_V1:
return '1.0'
if version == ParquetVersion_V2:
@@ -149,7 +149,7 @@ cdef class FileMetaData:
property created_by:
def __get__(self):
- return frombytes(self.metadata.created_by())
+ return frombytes(self._metadata.created_by())
def row_group(self, int i):
"""
@@ -159,14 +159,26 @@ cdef class FileMetaData:
result.init_from_file(self, i)
return result
+ property metadata:
+
+ def __get__(self):
+ cdef:
+ unordered_map[c_string, c_string] metadata
+ const CKeyValueMetadata* underlying_metadata
+ underlying_metadata = self._metadata.key_value_metadata().get()
+ if underlying_metadata != NULL:
+ underlying_metadata.ToUnorderedMap(&metadata)
+ return metadata
+ else:
+ return None
+
cdef class ParquetSchema:
cdef:
- object parent # the FileMetaData owning the SchemaDescriptor
+ FileMetaData parent # the FileMetaData owning the SchemaDescriptor
const SchemaDescriptor* schema
def __cinit__(self):
- self.parent = None
self.schema = NULL
def __repr__(self):
@@ -186,7 +198,7 @@ cdef class ParquetSchema:
cdef init_from_filemeta(self, FileMetaData container):
self.parent = container
- self.schema = container.metadata.schema()
+ self.schema = container._metadata.schema()
def __len__(self):
return self.schema.num_columns()
@@ -211,7 +223,9 @@ cdef class ParquetSchema:
shared_ptr[CSchema] sp_arrow_schema
with nogil:
- check_status(FromParquetSchema(self.schema, &sp_arrow_schema))
+ check_status(FromParquetSchema(
+ self.schema, self.parent._metadata.key_value_metadata(),
+ &sp_arrow_schema))
return pyarrow_wrap_schema(sp_arrow_schema)
@@ -232,7 +246,7 @@ cdef class ParquetSchema:
cdef class ColumnSchema:
cdef:
- object parent
+ ParquetSchema parent
const ColumnDescriptor* descr
def __cinit__(self):
@@ -463,7 +477,7 @@ cdef class ParquetReader:
"""
cdef:
FileMetaData container = self.metadata
- const CFileMetaData* metadata = container.metadata
+ const CFileMetaData* metadata = container._metadata
int i = 0
if self.column_idx_map is None:
@@ -488,12 +502,13 @@ cdef class ParquetReader:
return array
-cdef check_compression_name(name):
+cdef int check_compression_name(name) except -1:
if name.upper() not in ['NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI']:
raise ArrowException("Unsupported compression: " + name)
+ return 0
-cdef ParquetCompression compression_from_name(object name):
+cdef ParquetCompression compression_from_name(str name):
name = name.upper()
if name == "SNAPPY":
return ParquetCompression_SNAPPY
@@ -546,7 +561,7 @@ cdef class ParquetWriter:
maybe_unbox_memory_pool(memory_pool),
sink, properties, &self.writer))
- cdef _set_version(self, WriterProperties.Builder* props):
+ cdef void _set_version(self, WriterProperties.Builder* props):
if self.version is not None:
if self.version == "1.0":
props.version(ParquetVersion_V1)
@@ -555,7 +570,7 @@ cdef class ParquetWriter:
else:
raise ArrowException("Unsupported Parquet format version")
- cdef _set_compression_props(self, WriterProperties.Builder* props):
+ cdef void _set_compression_props(self, WriterProperties.Builder* props):
if isinstance(self.compression, basestring):
check_compression_name(self.compression)
props.compression(compression_from_name(self.compression))
@@ -564,7 +579,7 @@ cdef class ParquetWriter:
check_compression_name(codec)
props.compression(column, compression_from_name(codec))
- cdef _set_dictionary_props(self, WriterProperties.Builder* props):
+ cdef void _set_dictionary_props(self, WriterProperties.Builder* props):
if isinstance(self.use_dictionary, bool):
if self.use_dictionary:
props.enable_dictionary()
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/array.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index c132269..5930de3 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -159,7 +159,7 @@ cdef class Field:
def __cinit__(self):
pass
- cdef init(self, const shared_ptr[CField]& field):
+ cdef void init(self, const shared_ptr[CField]& field):
self.sp_field = field
self.field = field.get()
self.type = pyarrow_wrap_data_type(field.get().type())
@@ -264,11 +264,11 @@ cdef class Schema:
return result
- cdef init(self, const vector[shared_ptr[CField]]& fields):
+ cdef void init(self, const vector[shared_ptr[CField]]& fields):
self.schema = new CSchema(fields)
self.sp_schema.reset(self.schema)
- cdef init_schema(self, const shared_ptr[CSchema]& schema):
+ cdef void init_schema(self, const shared_ptr[CSchema]& schema):
self.schema = schema.get()
self.sp_schema = schema
@@ -310,6 +310,9 @@ cdef class Schema:
"""
return pyarrow_wrap_field(self.schema.GetFieldByName(tobytes(name)))
+ def get_field_index(self, name):
+ return self.schema.GetFieldIndex(tobytes(name))
+
def add_metadata(self, dict metadata):
"""
Add metadata as dict of string keys and values to Schema
@@ -352,9 +355,9 @@ cdef class Schema:
return self.__str__()
-cdef box_metadata(const CKeyValueMetadata* metadata):
+cdef dict box_metadata(const CKeyValueMetadata* metadata):
cdef unordered_map[c_string, c_string] result
- if metadata != NULL:
+ if metadata != nullptr:
metadata.ToUnorderedMap(&result)
return result
else:
@@ -813,45 +816,60 @@ cdef class Date64Value(ArrayValue):
ap.Value(self.index) / 1000).date()
+cdef dict DATETIME_CONVERSION_FUNCTIONS
+
+try:
+ import pandas as pd
+except ImportError:
+ DATETIME_CONVERSION_FUNCTIONS = {
+ TimeUnit_SECOND: lambda x, tzinfo: (
+ datetime.datetime.utcfromtimestamp(x).replace(tzinfo=tzinfo)
+ ),
+ TimeUnit_MILLI: lambda x, tzinfo: (
+ datetime.datetime.utcfromtimestamp(x / 1e3).replace(tzinfo=tzinfo)
+ ),
+ TimeUnit_MICRO: lambda x, tzinfo: (
+ datetime.datetime.utcfromtimestamp(x / 1e6).replace(tzinfo=tzinfo)
+ ),
+ }
+else:
+ DATETIME_CONVERSION_FUNCTIONS = {
+ TimeUnit_SECOND: lambda x, tzinfo: pd.Timestamp(
+ x * 1000000000, tz=tzinfo, unit='ns',
+ ),
+ TimeUnit_MILLI: lambda x, tzinfo: pd.Timestamp(
+ x * 1000000, tz=tzinfo, unit='ns',
+ ),
+ TimeUnit_MICRO: lambda x, tzinfo: pd.Timestamp(
+ x * 1000, tz=tzinfo, unit='ns',
+ ),
+ TimeUnit_NANO: lambda x, tzinfo: pd.Timestamp(
+ x, tz=tzinfo, unit='ns',
+ )
+ }
+
+
cdef class TimestampValue(ArrayValue):
def as_py(self):
cdef:
CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
- CTimestampType* dtype = <CTimestampType*>ap.type().get()
- int64_t val = ap.Value(self.index)
+ CTimestampType* dtype = <CTimestampType*> ap.type().get()
+ int64_t value = ap.Value(self.index)
- timezone = None
- tzinfo = None
- if dtype.timezone().size() > 0:
- timezone = frombytes(dtype.timezone())
+ if not dtype.timezone().empty():
import pytz
- tzinfo = pytz.timezone(timezone)
+ tzinfo = pytz.timezone(frombytes(dtype.timezone()))
+ else:
+ tzinfo = None
try:
- pd = _pandas()
- if dtype.unit() == TimeUnit_SECOND:
- val = val * 1000000000
- elif dtype.unit() == TimeUnit_MILLI:
- val = val * 1000000
- elif dtype.unit() == TimeUnit_MICRO:
- val = val * 1000
- return pd.Timestamp(val, tz=tzinfo)
- except ImportError:
- if dtype.unit() == TimeUnit_SECOND:
- result = datetime.datetime.utcfromtimestamp(val)
- elif dtype.unit() == TimeUnit_MILLI:
- result = datetime.datetime.utcfromtimestamp(float(val) / 1000)
- elif dtype.unit() == TimeUnit_MICRO:
- result = datetime.datetime.utcfromtimestamp(
- float(val) / 1000000)
- else:
- # TimeUnit_NANO
- raise NotImplementedError("Cannot convert nanosecond "
- "timestamps without pandas")
- if timezone is not None:
- result = result.replace(tzinfo=tzinfo)
- return result
+ converter = DATETIME_CONVERSION_FUNCTIONS[dtype.unit()]
+ except KeyError:
+ raise ValueError(
+ 'Cannot convert nanosecond timestamps without pandas'
+ )
+ return converter(value, tzinfo=tzinfo)
cdef class FloatValue(ArrayValue):
@@ -1042,7 +1060,7 @@ def array(object sequence, DataType type=None, MemoryPool memory_pool=None):
cdef class Array:
- cdef init(self, const shared_ptr[CArray]& sp_array):
+ cdef void init(self, const shared_ptr[CArray]& sp_array):
self.sp_array = sp_array
self.ap = sp_array.get()
self.type = pyarrow_wrap_data_type(self.sp_array.get().type())
@@ -1251,7 +1269,7 @@ cdef class Array:
cdef class Tensor:
- cdef init(self, const shared_ptr[CTensor]& sp_tensor):
+ cdef void init(self, const shared_ptr[CTensor]& sp_tensor):
self.sp_tensor = sp_tensor
self.tp = sp_tensor.get()
self.type = pyarrow_wrap_data_type(self.tp.type())
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index cc3b4b6..73bfb4f 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -18,7 +18,7 @@
# distutils: language = c++
from libc.stdint cimport *
-from libcpp cimport bool as c_bool
+from libcpp cimport bool as c_bool, nullptr
from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index a7e2733..9df31c8 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -29,6 +29,7 @@ cdef extern from "arrow/util/key_value_metadata.h" namespace "arrow" nogil:
void Append(const c_string& key, const c_string& value)
void ToUnorderedMap(unordered_map[c_string, c_string]*) const
+
cdef extern from "arrow/api.h" namespace "arrow" nogil:
enum Type" arrow::Type::type":
@@ -205,7 +206,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CField] field(int i)
shared_ptr[const CKeyValueMetadata] metadata()
- shared_ptr[CField] GetFieldByName(c_string& name)
+ shared_ptr[CField] GetFieldByName(const c_string& name)
+ int64_t GetFieldIndex(const c_string& name)
int num_fields()
c_string ToString()
@@ -686,8 +688,10 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
cdef cppclass PyBytesReader(CBufferReader):
PyBytesReader(object fo)
+
cdef extern from 'arrow/python/init.h':
int arrow_init_numpy() except -1
+
cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
void set_numpy_nan(object o)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index a153f22..23eb6ef 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -462,7 +462,7 @@ cdef class Buffer:
def __cinit__(self):
pass
- cdef init(self, const shared_ptr[CBuffer]& buffer):
+ cdef void init(self, const shared_ptr[CBuffer]& buffer):
self.buffer = buffer
self.shape[0] = self.size
self.strides[0] = <Py_ssize_t>(1)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 8338de3..a61d746 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -17,6 +17,7 @@
# Arrow file and stream reader/writer classes, and other messaging tools
+import pyarrow as pa
import pyarrow.lib as lib
@@ -119,3 +120,42 @@ def open_file(source, footer_offset=None):
reader : RecordBatchFileReader
"""
return RecordBatchFileReader(source, footer_offset=footer_offset)
+
+
+def serialize_pandas(df):
+ """Serialize a pandas DataFrame into a buffer protocol compatible object.
+
+ Parameters
+ ----------
+ df : pandas.DataFrame
+
+ Returns
+ -------
+ buf : buffer
+ An object compatible with the buffer protocol
+ """
+ batch = pa.RecordBatch.from_pandas(df)
+ sink = pa.InMemoryOutputStream()
+ writer = pa.RecordBatchFileWriter(sink, batch.schema)
+ writer.write_batch(batch)
+ writer.close()
+ return sink.get_result()
+
+
+def deserialize_pandas(buf, nthreads=1):
+ """Deserialize a buffer protocol compatible object into a pandas DataFrame.
+
+ Parameters
+ ----------
+ buf : buffer
+ An object compatible with the buffer protocol
+ nthreads : int, optional
+ The number of threads to use to convert the buffer to a DataFrame.
+
+ Returns
+ -------
+ df : pandas.DataFrame
+ """
+ buffer_reader = pa.BufferReader(buf)
+ reader = pa.RecordBatchFileReader(buffer_reader)
+ return reader.read_all().to_pandas(nthreads=nthreads)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/lib.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index d3d03aa..4a2ab86 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -33,7 +33,7 @@ cdef class MemoryPool:
cdef:
CMemoryPool* pool
- cdef init(self, CMemoryPool* pool)
+ cdef void init(self, CMemoryPool* pool)
cdef class LoggingMemoryPool(MemoryPool):
@@ -89,7 +89,7 @@ cdef class Field:
cdef readonly:
DataType type
- cdef init(self, const shared_ptr[CField]& field)
+ cdef void init(self, const shared_ptr[CField]& field)
cdef class Schema:
@@ -97,8 +97,8 @@ cdef class Schema:
shared_ptr[CSchema] sp_schema
CSchema* schema
- cdef init(self, const vector[shared_ptr[CField]]& fields)
- cdef init_schema(self, const shared_ptr[CSchema]& schema)
+ cdef void init(self, const vector[shared_ptr[CField]]& fields)
+ cdef void init_schema(self, const shared_ptr[CSchema]& schema)
cdef class Scalar:
@@ -155,7 +155,7 @@ cdef class Array:
cdef readonly:
DataType type
- cdef init(self, const shared_ptr[CArray]& sp_array)
+ cdef void init(self, const shared_ptr[CArray]& sp_array)
cdef getitem(self, int64_t i)
@@ -167,7 +167,7 @@ cdef class Tensor:
cdef readonly:
DataType type
- cdef init(self, const shared_ptr[CTensor]& sp_tensor)
+ cdef void init(self, const shared_ptr[CTensor]& sp_tensor)
cdef class NullArray(Array):
@@ -266,8 +266,8 @@ cdef class ChunkedArray:
shared_ptr[CChunkedArray] sp_chunked_array
CChunkedArray* chunked_array
- cdef init(self, const shared_ptr[CChunkedArray]& chunked_array)
- cdef _check_nullptr(self)
+ cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array)
+ cdef int _check_nullptr(self) except -1
cdef class Column:
@@ -275,8 +275,8 @@ cdef class Column:
shared_ptr[CColumn] sp_column
CColumn* column
- cdef init(self, const shared_ptr[CColumn]& column)
- cdef _check_nullptr(self)
+ cdef void init(self, const shared_ptr[CColumn]& column)
+ cdef int _check_nullptr(self) except -1
cdef class Table:
@@ -284,8 +284,8 @@ cdef class Table:
shared_ptr[CTable] sp_table
CTable* table
- cdef init(self, const shared_ptr[CTable]& table)
- cdef _check_nullptr(self)
+ cdef void init(self, const shared_ptr[CTable]& table)
+ cdef int _check_nullptr(self) except -1
cdef class RecordBatch:
@@ -294,8 +294,8 @@ cdef class RecordBatch:
CRecordBatch* batch
Schema _schema
- cdef init(self, const shared_ptr[CRecordBatch]& table)
- cdef _check_nullptr(self)
+ cdef void init(self, const shared_ptr[CRecordBatch]& table)
+ cdef int _check_nullptr(self) except -1
cdef class Buffer:
@@ -304,7 +304,7 @@ cdef class Buffer:
Py_ssize_t shape[1]
Py_ssize_t strides[1]
- cdef init(self, const shared_ptr[CBuffer]& buffer)
+ cdef void init(self, const shared_ptr[CBuffer]& buffer)
cdef class NativeFile:
@@ -335,3 +335,5 @@ cdef public object pyarrow_wrap_tensor(const shared_ptr[CTensor]& sp_tensor)
cdef public object pyarrow_wrap_column(const shared_ptr[CColumn]& ccolumn)
cdef public object pyarrow_wrap_table(const shared_ptr[CTable]& ctable)
cdef public object pyarrow_wrap_batch(const shared_ptr[CRecordBatch]& cbatch)
+
+cdef dict box_metadata(const CKeyValueMetadata* sp_metadata)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/memory.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/memory.pxi b/python/pyarrow/memory.pxi
index 15d59d2..6671a01 100644
--- a/python/pyarrow/memory.pxi
+++ b/python/pyarrow/memory.pxi
@@ -21,7 +21,7 @@
cdef class MemoryPool:
- cdef init(self, CMemoryPool* pool):
+ cdef void init(self, CMemoryPool* pool):
self.pool = pool
def bytes_allocated(self):
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/pandas_compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
new file mode 100644
index 0000000..2f72d6a
--- /dev/null
+++ b/python/pyarrow/pandas_compat.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import re
+import json
+import pandas as pd
+
+import six
+
+from pyarrow.compat import PY2
+
+
+INDEX_LEVEL_NAME_REGEX = re.compile(r'^__index_level_\d+__$')
+
+
+def is_unnamed_index_level(name):
+ return INDEX_LEVEL_NAME_REGEX.match(name) is not None
+
+
+def infer_dtype(column):
+ try:
+ return pd.api.types.infer_dtype(column)
+ except AttributeError:
+ return pd.lib.infer_dtype(column)
+
+
+def get_column_metadata(column, name):
+ inferred_dtype = infer_dtype(column)
+ dtype = column.dtype
+
+ if hasattr(dtype, 'categories'):
+ extra_metadata = {
+ 'num_categories': len(column.cat.categories),
+ 'ordered': column.cat.ordered,
+ }
+ elif hasattr(dtype, 'tz'):
+ extra_metadata = {'timezone': str(dtype.tz)}
+ else:
+ extra_metadata = None
+
+ if not isinstance(name, six.string_types):
+ raise TypeError(
+ 'Column name must be a string. Got column {} of type {}'.format(
+ name, type(name).__name__
+ )
+ )
+
+ return {
+ 'name': name,
+ 'pandas_type': {
+ 'string': 'bytes' if PY2 else 'unicode',
+ 'datetime64': (
+ 'datetimetz' if hasattr(dtype, 'tz')
+ else 'datetime'
+ ),
+ 'integer': str(dtype),
+ 'floating': str(dtype),
+ }.get(inferred_dtype, inferred_dtype),
+ 'numpy_dtype': str(dtype),
+ 'metadata': extra_metadata,
+ }
+
+
+def index_level_name(index, i):
+ return index.name or '__index_level_{:d}__'.format(i)
+
+
+def construct_metadata(df, index_levels, preserve_index):
+ return {
+ b'pandas': json.dumps(
+ {
+ 'index_columns': [
+ index_level_name(level, i)
+ for i, level in enumerate(index_levels)
+ ] if preserve_index else [],
+ 'columns': [
+ get_column_metadata(df[name], name=name)
+ for name in df.columns
+ ] + (
+ [
+ get_column_metadata(
+ level, name=index_level_name(level, i)
+ )
+ for i, level in enumerate(index_levels)
+ ] if preserve_index else []
+ ),
+ 'pandas_version': pd.__version__,
+ }
+ ).encode('utf8')
+ }
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 050ec31..e69d85e 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -15,6 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+import itertools
+import json
+
import six
import numpy as np
@@ -101,14 +104,31 @@ class ParquetFile(object):
column_indices = self._get_column_indices(columns)
if nthreads is not None:
self.reader.set_num_threads(nthreads)
+
+ return self.reader.read_all(column_indices=column_indices)
+
+ def read_pandas(self, columns=None, nthreads=1):
+ column_indices = self._get_column_indices(columns)
+ custom_metadata = self.metadata.metadata
+
+ if custom_metadata and b'pandas' in custom_metadata:
+ index_columns = json.loads(
+ custom_metadata[b'pandas'].decode('utf8')
+ )['index_columns']
+ else:
+ index_columns = []
+
+ if column_indices is not None and index_columns:
+ column_indices += map(self.reader.column_name_idx, index_columns)
+
+ if nthreads is not None:
+ self.reader.set_num_threads(nthreads)
return self.reader.read_all(column_indices=column_indices)
def _get_column_indices(self, column_names):
if column_names is None:
return None
- else:
- return [self.reader.column_name_idx(column)
- for column in column_names]
+ return list(map(self.reader.column_name_idx, column_names))
# ----------------------------------------------------------------------
@@ -618,6 +638,43 @@ def read_table(source, columns=None, nthreads=1, metadata=None):
return pf.read(columns=columns, nthreads=nthreads)
+def read_pandas(source, columns=None, nthreads=1, metadata=None):
+ """
+ Read a Table from Parquet format, reconstructing the index values if
+ available.
+
+ Parameters
+ ----------
+ source: str or pyarrow.io.NativeFile
+ Location of Parquet dataset. If a string passed, can be a single file
+ name. For passing Python file objects or byte buffers,
+ see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
+ columns: list
+ If not None, only these columns will be read from the file.
+ nthreads : int, default 1
+ Number of columns to read in parallel. Requires that the underlying
+ file source is threadsafe
+ metadata : FileMetaData
+ If separately computed
+
+ Returns
+ -------
+ pyarrow.Table
+ Content of the file as a Table of Columns, including DataFrame indexes
+ as Columns.
+ """
+ if is_string(source):
+ fs = LocalFilesystem.get_instance()
+ if fs.isdir(source):
+ raise NotImplementedError(
+ 'Reading a directory of Parquet files with DataFrame index '
+ 'metadata is not yet supported'
+ )
+
+ pf = ParquetFile(source, metadata=metadata)
+ return pf.read_pandas(columns=columns, nthreads=nthreads)
+
+
def write_table(table, where, row_group_size=None, version='1.0',
use_dictionary=True, compression='snappy', **kwargs):
"""
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index bd8cce4..c9915c1 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -15,8 +15,18 @@
# specific language governing permissions and limitations
# under the License.
+import json
+
from collections import OrderedDict
+try:
+ import pandas as pd
+except ImportError:
+ # The pure-Python based API works without a pandas installation
+ pass
+else:
+ import pyarrow.pandas_compat as pdcompat
+
cdef class ChunkedArray:
"""
@@ -30,14 +40,18 @@ cdef class ChunkedArray:
def __cinit__(self):
self.chunked_array = NULL
- cdef init(self, const shared_ptr[CChunkedArray]& chunked_array):
+ cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array):
self.sp_chunked_array = chunked_array
self.chunked_array = chunked_array.get()
- cdef _check_nullptr(self):
+ cdef int _check_nullptr(self) except -1:
if self.chunked_array == NULL:
- raise ReferenceError("ChunkedArray object references a NULL "
- "pointer. Not initialized.")
+ raise ReferenceError(
+ "{} object references a NULL pointer. Not initialized.".format(
+ type(self).__name__
+ )
+ )
+ return 0
def length(self):
self._check_nullptr()
@@ -111,7 +125,7 @@ cdef class Column:
def __cinit__(self):
self.column = NULL
- cdef init(self, const shared_ptr[CColumn]& column):
+ cdef void init(self, const shared_ptr[CColumn]& column):
self.sp_column = column
self.column = column.get()
@@ -142,7 +156,7 @@ cdef class Column:
check_status(libarrow.ConvertColumnToPandas(self.sp_column,
self, &out))
- return _pandas().Series(wrap_array_output(out), name=self.name)
+ return pd.Series(wrap_array_output(out), name=self.name)
def equals(self, Column other):
"""
@@ -175,14 +189,17 @@ cdef class Column:
"""
return self.data.to_pylist()
- cdef _check_nullptr(self):
+ cdef int _check_nullptr(self) except -1:
if self.column == NULL:
- raise ReferenceError("Column object references a NULL pointer."
- "Not initialized.")
+ raise ReferenceError(
+ "{} object references a NULL pointer. Not initialized.".format(
+ type(self).__name__
+ )
+ )
+ return 0
def __len__(self):
- self._check_nullptr()
- return self.column.length()
+ return self.length()
def length(self):
self._check_nullptr()
@@ -248,8 +265,9 @@ cdef class Column:
return chunked_array
-cdef shared_ptr[const CKeyValueMetadata] key_value_metadata_from_dict(
- dict metadata):
+cdef shared_ptr[const CKeyValueMetadata] unbox_metadata(dict metadata):
+ if metadata is None:
+ return <shared_ptr[const CKeyValueMetadata]> nullptr
cdef:
unordered_map[c_string, c_string] unordered_metadata = metadata
return (<shared_ptr[const CKeyValueMetadata]>
@@ -289,27 +307,45 @@ cdef int _schema_from_arrays(
else:
raise TypeError(type(arrays[0]))
- schema.reset(new CSchema(fields, key_value_metadata_from_dict(metadata)))
+ schema.reset(new CSchema(fields, unbox_metadata(metadata)))
return 0
-cdef tuple _dataframe_to_arrays(df, bint timestamps_to_ms, Schema schema):
+cdef tuple _dataframe_to_arrays(
+ df,
+ bint timestamps_to_ms,
+ Schema schema,
+ bint preserve_index
+):
cdef:
list names = []
list arrays = []
+ list index_levels = []
DataType type = None
- dict metadata = {}
+ dict metadata
+
+ if preserve_index:
+ index_levels.extend(getattr(df.index, 'levels', [df.index]))
for name in df.columns:
col = df[name]
if schema is not None:
type = schema.field_by_name(name).type
- arr = Array.from_pandas(col, type=type,
- timestamps_to_ms=timestamps_to_ms)
+ arr = arrays.append(
+ Array.from_pandas(
+ col, type=type, timestamps_to_ms=timestamps_to_ms
+ )
+ )
names.append(name)
- arrays.append(arr)
+ for i, level in enumerate(index_levels):
+ arrays.append(
+ Array.from_pandas(level, timestamps_to_ms=timestamps_to_ms)
+ )
+ names.append(pdcompat.index_level_name(level, i))
+
+ metadata = pdcompat.construct_metadata(df, index_levels, preserve_index)
return names, arrays, metadata
@@ -327,13 +363,18 @@ cdef class RecordBatch:
self.batch = NULL
self._schema = None
- cdef init(self, const shared_ptr[CRecordBatch]& batch):
+ cdef void init(self, const shared_ptr[CRecordBatch]& batch):
self.sp_batch = batch
self.batch = batch.get()
- cdef _check_nullptr(self):
+ cdef int _check_nullptr(self) except -1:
if self.batch == NULL:
- raise ReferenceError("Object not initialized")
+ raise ReferenceError(
+ "{} object references a NULL pointer. Not initialized.".format(
+ type(self).__name__
+ )
+ )
+ return 0
def __len__(self):
self._check_nullptr()
@@ -455,22 +496,27 @@ cdef class RecordBatch:
return Table.from_batches([self]).to_pandas(nthreads=nthreads)
@classmethod
- def from_pandas(cls, df, schema=None):
+ def from_pandas(cls, df, Schema schema=None, bint preserve_index=True):
"""
Convert pandas.DataFrame to an Arrow RecordBatch
Parameters
----------
df: pandas.DataFrame
- schema: pyarrow.Schema (optional)
+ schema: pyarrow.Schema, optional
The expected schema of the RecordBatch. This can be used to
indicate the type of columns if we cannot infer it automatically.
+ preserve_index : bool, optional
+ Whether to store the index as an additional column in the resulting
+ ``RecordBatch``.
Returns
-------
pyarrow.RecordBatch
"""
- names, arrays, metadata = _dataframe_to_arrays(df, False, schema)
+ names, arrays, metadata = _dataframe_to_arrays(
+ df, False, schema, preserve_index
+ )
return cls.from_arrays(arrays, names, metadata)
@staticmethod
@@ -503,7 +549,7 @@ cdef class RecordBatch:
raise ValueError('Record batch cannot contain no arrays (for now)')
num_rows = len(arrays[0])
- _schema_from_arrays(arrays, names, metadata or {}, &schema)
+ _schema_from_arrays(arrays, names, metadata, &schema)
c_arrays.reserve(len(arrays))
for arr in arrays:
@@ -513,19 +559,55 @@ cdef class RecordBatch:
return pyarrow_wrap_batch(batch)
-cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
- cdef:
- PyObject* result_obj
- CColumn* col
- int i
-
+cdef table_to_blockmanager(const shared_ptr[CTable]& ctable, int nthreads):
import pandas.core.internals as _int
from pandas import RangeIndex, Categorical
from pyarrow.compat import DatetimeTZDtype
+ cdef:
+ Table table = pyarrow_wrap_table(ctable)
+ Table block_table = pyarrow_wrap_table(ctable)
+ Schema schema = table.schema
+
+ size_t row_count = table.num_rows
+ size_t total_columns = table.num_columns
+
+ dict metadata = schema.metadata
+ dict pandas_metadata = None
+
+ list index_columns = []
+ list index_arrays = []
+
+ if metadata is not None and b'pandas' in metadata:
+ pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
+ index_columns = pandas_metadata['index_columns']
+
+ cdef:
+ Column col
+ int64_t i
+
+ for name in index_columns:
+ i = schema.get_field_index(name)
+ if i != -1:
+ col = table.column(i)
+ index_name = None if pdcompat.is_unnamed_index_level(name) else name
+ index_arrays.append(
+ pd.Index(col.to_pandas().values, name=index_name)
+ )
+ block_table = block_table.remove_column(
+ block_table.schema.get_field_index(name)
+ )
+
+ cdef:
+ PyObject* result_obj
+ shared_ptr[CTable] c_block_table = block_table.sp_table
+
with nogil:
- check_status(libarrow.ConvertTableToPandas(table, nthreads,
- &result_obj))
+ check_status(
+ libarrow.ConvertTableToPandas(
+ c_block_table, nthreads, &result_obj
+ )
+ )
result = PyObject_to_object(result_obj)
@@ -549,12 +631,13 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
block = _int.make_block(block_arr, placement=placement)
blocks.append(block)
- names = []
- for i in range(table.get().num_columns()):
- col = table.get().column(i).get()
- names.append(frombytes(col.name()))
+ cdef list axes = [
+ [column.name for column in block_table.itercolumns()],
+ pd.MultiIndex.from_arrays(
+ index_arrays
+ ) if index_arrays else pd.RangeIndex(row_count),
+ ]
- axes = [names, RangeIndex(table.get().num_rows())]
return _int.BlockManager(blocks, axes)
@@ -572,16 +655,18 @@ cdef class Table:
self.table = NULL
def __repr__(self):
- return 'pyarrow.Table\n{0}'.format(str(self.schema))
+ return 'pyarrow.{}\n{}'.format(type(self).__name__, str(self.schema))
- cdef init(self, const shared_ptr[CTable]& table):
+ cdef void init(self, const shared_ptr[CTable]& table):
self.sp_table = table
self.table = table.get()
- cdef _check_nullptr(self):
- if self.table == NULL:
- raise ReferenceError("Table object references a NULL pointer."
- "Not initialized.")
+ cdef int _check_nullptr(self) except -1:
+ if self.table == nullptr:
+ raise ReferenceError(
+ "Table object references a NULL pointer. Not initialized."
+ )
+ return 0
def equals(self, Table other):
"""
@@ -609,22 +694,29 @@ cdef class Table:
return result
@classmethod
- def from_pandas(cls, df, timestamps_to_ms=False, schema=None):
+ def from_pandas(
+ cls,
+ df,
+ bint timestamps_to_ms=False,
+ Schema schema=None,
+ bint preserve_index=True
+ ):
"""
Convert pandas.DataFrame to an Arrow Table
Parameters
----------
- df: pandas.DataFrame
-
- timestamps_to_ms: bool
+ df : pandas.DataFrame
+ timestamps_to_ms : bool
Convert datetime columns to ms resolution. This is needed for
compability with other functionality like Parquet I/O which
only supports milliseconds.
-
- schema: pyarrow.Schema (optional)
+ schema : pyarrow.Schema, optional
The expected schema of the Arrow Table. This can be used to
indicate the type of columns if we cannot infer it automatically.
+ preserve_index : bool, optional
+ Whether to store the index as an additional column in the resulting
+ ``Table``.
Returns
-------
@@ -642,9 +734,12 @@ cdef class Table:
>>> pa.Table.from_pandas(df)
<pyarrow.lib.Table object at 0x7f05d1fb1b40>
"""
- names, arrays, metadata = _dataframe_to_arrays(df,
- timestamps_to_ms=timestamps_to_ms,
- schema=schema)
+ names, arrays, metadata = _dataframe_to_arrays(
+ df,
+ timestamps_to_ms=timestamps_to_ms,
+ schema=schema,
+ preserve_index=preserve_index
+ )
return cls.from_arrays(arrays, names=names, metadata=metadata)
@staticmethod
@@ -671,7 +766,7 @@ cdef class Table:
shared_ptr[CTable] table
size_t K = len(arrays)
- _schema_from_arrays(arrays, names, metadata or {}, &schema)
+ _schema_from_arrays(arrays, names, metadata, &schema)
columns.reserve(K)
@@ -734,7 +829,7 @@ cdef class Table:
nthreads = cpu_count()
mgr = table_to_blockmanager(self.sp_table, nthreads)
- return _pandas().DataFrame(mgr)
+ return pd.DataFrame(mgr)
def to_pydict(self):
"""
@@ -744,11 +839,16 @@ cdef class Table:
-------
OrderedDict
"""
- entries = []
- for i in range(self.table.num_columns()):
- name = self.column(i).name
- column = self.column(i).to_pylist()
- entries.append((name, column))
+ cdef:
+ size_t i
+ size_t num_columns = self.table.num_columns()
+ list entries = []
+ Column column
+
+ for i in range(num_columns):
+ column = self.column(i)
+ entries.append((column.name, column.to_pylist()))
+
return OrderedDict(entries)
@property
@@ -846,8 +946,7 @@ cdef class Table:
"""
Add column to Table at position. Returns new table
"""
- cdef:
- shared_ptr[CTable] c_table
+ cdef shared_ptr[CTable] c_table
with nogil:
check_status(self.table.AddColumn(i, column.sp_column, &c_table))
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/pandas_examples.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/pandas_examples.py b/python/pyarrow/tests/pandas_examples.py
index 313a3ae..17ad4b2 100644
--- a/python/pyarrow/tests/pandas_examples.py
+++ b/python/pyarrow/tests/pandas_examples.py
@@ -23,7 +23,7 @@ import pandas as pd
import pyarrow as pa
-def dataframe_with_arrays():
+def dataframe_with_arrays(include_index=False):
"""
Dataframe with numpy arrays columns of every possible primtive type.
@@ -72,13 +72,15 @@ def dataframe_with_arrays():
dtype='datetime64[ms]'),
]
+ if include_index:
+ fields.append(pa.field('__index_level_0__', pa.int64()))
df = pd.DataFrame(arrays)
schema = pa.schema(fields)
return df, schema
-def dataframe_with_lists():
+def dataframe_with_lists(include_index=False):
"""
Dataframe with list columns of every possible primtive type.
@@ -113,6 +115,8 @@ def dataframe_with_lists():
[u"1", u"2", u"3"]
]
+ if include_index:
+ fields.append(pa.field('__index_level_0__', pa.int64()))
df = pd.DataFrame(arrays)
schema = pa.schema(fields)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index be35905..ca30455 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -67,9 +67,10 @@ class TestPandasConversion(unittest.TestCase):
def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
timestamps_to_ms=False, expected_schema=None,
- check_dtype=True, schema=None):
+ check_dtype=True, schema=None,
+ check_index=False):
table = pa.Table.from_pandas(df, timestamps_to_ms=timestamps_to_ms,
- schema=schema)
+ schema=schema, preserve_index=check_index)
result = table.to_pandas(nthreads=nthreads)
if expected_schema:
assert table.schema.equals(expected_schema)
@@ -299,8 +300,11 @@ class TestPandasConversion(unittest.TestCase):
})
field = pa.field('datetime64', pa.timestamp('ms'))
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, timestamps_to_ms=True,
- expected_schema=schema)
+ self._check_pandas_roundtrip(
+ df,
+ timestamps_to_ms=True,
+ expected_schema=schema,
+ )
df = pd.DataFrame({
'datetime64': np.array([
@@ -311,8 +315,11 @@ class TestPandasConversion(unittest.TestCase):
})
field = pa.field('datetime64', pa.timestamp('ns'))
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, timestamps_to_ms=False,
- expected_schema=schema)
+ self._check_pandas_roundtrip(
+ df,
+ timestamps_to_ms=False,
+ expected_schema=schema,
+ )
def test_timestamps_notimezone_nulls(self):
df = pd.DataFrame({
@@ -324,8 +331,11 @@ class TestPandasConversion(unittest.TestCase):
})
field = pa.field('datetime64', pa.timestamp('ms'))
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, timestamps_to_ms=True,
- expected_schema=schema)
+ self._check_pandas_roundtrip(
+ df,
+ timestamps_to_ms=True,
+ expected_schema=schema,
+ )
df = pd.DataFrame({
'datetime64': np.array([
@@ -336,8 +346,11 @@ class TestPandasConversion(unittest.TestCase):
})
field = pa.field('datetime64', pa.timestamp('ns'))
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, timestamps_to_ms=False,
- expected_schema=schema)
+ self._check_pandas_roundtrip(
+ df,
+ timestamps_to_ms=False,
+ expected_schema=schema,
+ )
def test_timestamps_with_timezone(self):
df = pd.DataFrame({
@@ -370,7 +383,7 @@ class TestPandasConversion(unittest.TestCase):
None,
datetime.date(1970, 1, 1),
datetime.date(2040, 2, 26)]})
- table = pa.Table.from_pandas(df)
+ table = pa.Table.from_pandas(df, preserve_index=False)
field = pa.field('date', pa.date32())
schema = pa.schema([field])
assert table.schema.equals(schema)
@@ -446,7 +459,7 @@ class TestPandasConversion(unittest.TestCase):
def test_column_of_arrays(self):
df, schema = dataframe_with_arrays()
self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
- table = pa.Table.from_pandas(df, schema=schema)
+ table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
assert table.schema.equals(schema)
for column in df.columns:
@@ -456,7 +469,7 @@ class TestPandasConversion(unittest.TestCase):
def test_column_of_lists(self):
df, schema = dataframe_with_lists()
self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
- table = pa.Table.from_pandas(df, schema=schema)
+ table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
assert table.schema.equals(schema)
for column in df.columns:
@@ -543,7 +556,7 @@ class TestPandasConversion(unittest.TestCase):
decimal.Decimal('1234.439'),
]
})
- converted = pa.Table.from_pandas(expected)
+ converted = pa.Table.from_pandas(expected, preserve_index=False)
field = pa.field('decimals', pa.decimal(7, 3))
schema = pa.schema([field])
assert converted.schema.equals(schema)
@@ -566,7 +579,7 @@ class TestPandasConversion(unittest.TestCase):
decimal.Decimal('129534.123731'),
]
})
- converted = pa.Table.from_pandas(expected)
+ converted = pa.Table.from_pandas(expected, preserve_index=False)
field = pa.field('decimals', pa.decimal(12, 6))
schema = pa.schema([field])
assert converted.schema.equals(schema)
@@ -589,7 +602,7 @@ class TestPandasConversion(unittest.TestCase):
-decimal.Decimal('314292388910493.12343437128'),
]
})
- converted = pa.Table.from_pandas(expected)
+ converted = pa.Table.from_pandas(expected, preserve_index=False)
field = pa.field('decimals', pa.decimal(26, 11))
schema = pa.schema([field])
assert converted.schema.equals(schema)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 994876d..eeea39a 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -240,6 +240,57 @@ def test_get_record_batch_size():
assert pa.get_record_batch_size(batch) > (N * itemsize)
+def test_pandas_serialize_round_trip():
+ index = pd.Index([1, 2, 3], name='my_index')
+ columns = ['foo', 'bar']
+ df = pd.DataFrame(
+ {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')},
+ index=index, columns=columns
+ )
+ buf = pa.serialize_pandas(df)
+ result = pa.deserialize_pandas(buf)
+ assert_frame_equal(result, df)
+
+
+def test_pandas_serialize_round_trip_nthreads():
+ index = pd.Index([1, 2, 3], name='my_index')
+ columns = ['foo', 'bar']
+ df = pd.DataFrame(
+ {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')},
+ index=index, columns=columns
+ )
+ buf = pa.serialize_pandas(df)
+ result = pa.deserialize_pandas(buf, nthreads=2)
+ assert_frame_equal(result, df)
+
+
+def test_pandas_serialize_round_trip_multi_index():
+ index1 = pd.Index([1, 2, 3], name='level_1')
+ index2 = pd.Index(list('def'), name=None)
+ index = pd.MultiIndex.from_arrays([index1, index2])
+
+ columns = ['foo', 'bar']
+ df = pd.DataFrame(
+ {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')},
+ index=index,
+ columns=columns,
+ )
+ buf = pa.serialize_pandas(df)
+ result = pa.deserialize_pandas(buf)
+ assert_frame_equal(result, df)
+
+
+@pytest.mark.xfail(
+ raises=TypeError,
+ reason='Non string columns are not supported',
+)
+def test_pandas_serialize_round_trip_not_string_columns():
+ df = pd.DataFrame(list(zip([1.5, 1.6, 1.7], 'abc')))
+ buf = pa.serialize_pandas(df)
+ result = pa.deserialize_pandas(buf)
+ assert_frame_equal(result, df)
+
+
def write_file(batch, sink):
writer = pa.RecordBatchFileWriter(sink, batch.schema)
writer.write_batch(batch)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 5dbe657..db446d3 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -19,6 +19,7 @@ from os.path import join as pjoin
import datetime
import io
import os
+import json
import pytest
from pyarrow.compat import guid, u
@@ -31,15 +32,11 @@ import pandas as pd
import pandas.util.testing as tm
-try:
- import pyarrow.parquet as pq
- HAVE_PARQUET = True
-except ImportError:
- HAVE_PARQUET = False
+# Skip all parquet tests if we can't import pyarrow.parquet
+pq = pytest.importorskip('pyarrow.parquet')
-# XXX: Make Parquet tests opt-in rather than skip-if-not-build
-parquet = pytest.mark.skipif(not HAVE_PARQUET,
- reason='Parquet support not built')
+# Ignore these with pytest ... -m 'not parquet'
+parquet = pytest.mark.parquet
@parquet
@@ -91,8 +88,55 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
filename = tmpdir.join('pandas_rountrip.parquet')
arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+ assert b'pandas' in arrow_table.schema.metadata
+
pq.write_table(arrow_table, filename.strpath, version="2.0")
- table_read = pq.read_table(filename.strpath)
+ table_read = pq.read_pandas(filename.strpath)
+ assert b'pandas' in table_read.schema.metadata
+
+ assert arrow_table.schema.metadata == table_read.schema.metadata
+
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@parquet
+def test_pandas_parquet_custom_metadata(tmpdir):
+ df = alltypes_sample(size=10000)
+
+ filename = tmpdir.join('pandas_rountrip.parquet')
+ arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+ assert b'pandas' in arrow_table.schema.metadata
+
+ pq.write_table(arrow_table, filename.strpath, version="2.0")
+ pf = pq.ParquetFile(filename.strpath)
+
+ md = pf.metadata.metadata
+ assert b'pandas' in md
+
+ js = json.loads(md[b'pandas'].decode('utf8'))
+ assert js['index_columns'] == ['__index_level_0__']
+
+
+@parquet
+def test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tmpdir):
+ df = alltypes_sample(size=10000)
+
+ filename = tmpdir.join('pandas_rountrip.parquet')
+ arrow_table = pa.Table.from_pandas(
+ df, timestamps_to_ms=True, preserve_index=False
+ )
+ js = json.loads(arrow_table.schema.metadata[b'pandas'].decode('utf8'))
+ assert not js['index_columns']
+
+ pq.write_table(arrow_table, filename.strpath, version="2.0")
+ table_read = pq.read_pandas(filename.strpath)
+
+ js = json.loads(table_read.schema.metadata[b'pandas'].decode('utf8'))
+ assert not js['index_columns']
+
+ assert arrow_table.schema.metadata == table_read.schema.metadata
+
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@@ -167,7 +211,6 @@ def _test_dataframe(size=10000, seed=0):
'int32': _random_integers(size, np.int32),
'int64': _random_integers(size, np.int64),
'float32': np.random.randn(size).astype(np.float32),
- 'float64': np.random.randn(size),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0,
'strings': [tm.rands(10) for i in range(size)]
@@ -188,6 +231,18 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
@parquet
+def test_read_pandas_column_subset(tmpdir):
+ df = _test_dataframe(10000)
+ arrow_table = pa.Table.from_pandas(df)
+ imos = pa.BufferOutputStream()
+ pq.write_table(arrow_table, imos, version="2.0")
+ buf = imos.get_result()
+ reader = pa.BufferReader(buf)
+ df_read = pq.read_pandas(reader, columns=['strings', 'uint8']).to_pandas()
+ tm.assert_frame_equal(df[['strings', 'uint8']], df_read)
+
+
+@parquet
def test_pandas_parquet_pyfile_roundtrip(tmpdir):
filename = tmpdir.join('pandas_pyfile_roundtrip.parquet').strpath
size = 5
@@ -270,7 +325,7 @@ def test_parquet_metadata_api():
meta = fileh.metadata
repr(meta)
assert meta.num_rows == len(df)
- assert meta.num_columns == ncols
+ assert meta.num_columns == ncols + 1 # +1 for index
assert meta.num_row_groups == 1
assert meta.format_version == '2.0'
assert 'parquet-cpp' in meta.created_by
@@ -278,7 +333,7 @@ def test_parquet_metadata_api():
# Schema
schema = fileh.schema
assert meta.schema is schema
- assert len(schema) == ncols
+ assert len(schema) == ncols + 1 # +1 for index
repr(schema)
col = schema[0]
@@ -292,7 +347,7 @@ def test_parquet_metadata_api():
assert col.logical_type == 'NONE'
with pytest.raises(IndexError):
- schema[ncols]
+ schema[ncols + 1] # +1 for index
with pytest.raises(IndexError):
schema[-1]
@@ -302,7 +357,7 @@ def test_parquet_metadata_api():
repr(rg_meta)
assert rg_meta.num_rows == len(df)
- assert rg_meta.num_columns == ncols
+ assert rg_meta.num_columns == ncols + 1 # +1 for index
@parquet
@@ -502,9 +557,22 @@ def test_read_single_row_group():
result = pa.concat_tables(row_groups)
tm.assert_frame_equal(df, result.to_pandas())
+
+@parquet
+def test_read_single_row_group_with_column_subset():
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+ a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+
+ buf = io.BytesIO()
+ pq.write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.0')
+
+ buf.seek(0)
+ pf = pq.ParquetFile(buf)
+
cols = df.columns[:2]
- row_groups = [pf.read_row_group(i, columns=cols)
- for i in range(K)]
+ row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)]
result = pa.concat_tables(row_groups)
tm.assert_frame_equal(df[cols], result.to_pandas())
@@ -696,6 +764,9 @@ def test_read_multiple_files(tmpdir):
assert result.equals(expected)
+ with pytest.raises(NotImplementedError):
+ pq.read_pandas(dirpath)
+
# Read with provided metadata
metadata = pq.ParquetFile(paths[0]).metadata
@@ -706,10 +777,11 @@ def test_read_multiple_files(tmpdir):
assert result3.equals(expected)
# Read column subset
- to_read = [result[0], result[3], result[6]]
+ to_read = [result[0], result[2], result[6], result[result.num_columns - 1]]
+
result = pa.localfs.read_parquet(
dirpath, columns=[c.name for c in to_read])
- expected = pa.Table.from_arrays(to_read)
+ expected = pa.Table.from_arrays(to_read, metadata=result.schema.metadata)
assert result.equals(expected)
# Read with multiple threads
http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 72ce696..ed22011 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -142,7 +142,7 @@ def test_recordbatchlist_to_pandas():
table = pa.Table.from_batches([batch1, batch2])
result = table.to_pandas()
- data = pd.concat([data1, data2], ignore_index=True)
+ data = pd.concat([data1, data2])
assert_frame_equal(data, result)