You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/09/07 02:52:18 UTC
arrow git commit: ARROW-1435: [Python] Properly handle time zone
metadata in Parquet round trips
Repository: arrow
Updated Branches:
refs/heads/master 3033eac10 -> 6e5f7be10
ARROW-1435: [Python] Properly handle time zone metadata in Parquet round trips
cc @jreback. Various bugs fixed here, but the bottom line is that this enables tz-aware pandas data to be faithfully round-tripped to Parquet format. We will need to implement compatibility tests in pandas for this, too
example DataFrame that could not be properly written before:
```python
s = pd.Series([datetime.datetime(2017, 9, 6)])
s = s.dt.tz_localize('utc')
s.index = s
# Both a column and an index to hit both use cases
df = pd.DataFrame({'tz_aware': s}, index=s)
```
Author: Wes McKinney <we...@twosigma.com>
Closes #1054 from wesm/ARROW-1435 and squashes the following commits:
6519945f [Wes McKinney] Add test for a non-UTC time zone too
20bb6dc1 [Wes McKinney] Get round trip for tz-aware index to Parquet working. Handle time zones in Column.to_pandas
f92abaa7 [Wes McKinney] Fix, initial test passing
6701cf0e [Wes McKinney] Initial cut at fixing tz aware columns to/from Parquet
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6e5f7be1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6e5f7be1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6e5f7be1
Branch: refs/heads/master
Commit: 6e5f7be1088641958163dfe59e06245bd08b22a6
Parents: 3033eac
Author: Wes McKinney <we...@twosigma.com>
Authored: Wed Sep 6 22:52:13 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed Sep 6 22:52:13 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/python/pandas_to_arrow.cc | 6 ++-
python/pyarrow/__init__.py | 3 +-
python/pyarrow/pandas_compat.py | 53 ++++++++++++++++++++----
python/pyarrow/scalar.pxi | 15 +++++--
python/pyarrow/table.pxi | 12 +++++-
python/pyarrow/tests/test_convert_pandas.py | 33 +++++++++++----
python/pyarrow/tests/test_parquet.py | 29 ++++++++++++-
python/pyarrow/tests/test_serialization.py | 4 +-
python/pyarrow/types.pxi | 18 +++++++-
9 files changed, 144 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/cpp/src/arrow/python/pandas_to_arrow.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/pandas_to_arrow.cc b/cpp/src/arrow/python/pandas_to_arrow.cc
index 8410381..2493779 100644
--- a/cpp/src/arrow/python/pandas_to_arrow.cc
+++ b/cpp/src/arrow/python/pandas_to_arrow.cc
@@ -347,8 +347,9 @@ class PandasConverter {
}
BufferVector buffers = {null_bitmap_, data};
- return PushArray(
- std::make_shared<ArrayData>(type_, length_, std::move(buffers), null_count, 0));
+ auto arr_data = std::make_shared<ArrayData>(type_, length_, std::move(buffers),
+ null_count, 0);
+ return PushArray(arr_data);
}
template <typename T>
@@ -1158,6 +1159,7 @@ Status PandasToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo,
PandasConverter converter(pool, ao, mo, type);
RETURN_NOT_OK(converter.Convert());
*out = converter.result()[0];
+ DCHECK(*out);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 2b6c9fe..0d76a35 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -65,7 +65,8 @@ from pyarrow.lib import (null, bool_,
FloatValue, DoubleValue, ListValue,
BinaryValue, StringValue, FixedSizeBinaryValue,
DecimalValue,
- Date32Value, Date64Value, TimestampValue)
+ Date32Value, Date64Value, TimestampValue,
+ TimestampType)
from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
FixedSizeBufferWriter,
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/pandas_compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 434b1c9..d1e6f5a 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -172,8 +172,8 @@ def construct_metadata(df, column_names, index_levels, preserve_index, types):
dict
"""
ncolumns = len(column_names)
- df_types = types[:ncolumns]
- index_types = types[ncolumns:ncolumns + len(index_levels)]
+ df_types = types[:ncolumns - len(index_levels)]
+ index_types = types[ncolumns - len(index_levels):]
column_metadata = [
get_column_metadata(df[col_name], name=sanitized_name,
@@ -269,13 +269,15 @@ def maybe_coerce_datetime64(values, dtype, type_, timestamps_to_ms=False):
return values, type_
+def make_datetimetz(tz):
+ from pyarrow.compat import DatetimeTZDtype
+ return DatetimeTZDtype('ns', tz=tz)
+
+
def table_to_blockmanager(options, table, memory_pool, nthreads=1):
import pandas.core.internals as _int
- from pyarrow.compat import DatetimeTZDtype
import pyarrow.lib as lib
- block_table = table
-
index_columns = []
index_arrays = []
index_names = []
@@ -286,6 +288,9 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
if metadata is not None and b'pandas' in metadata:
pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
index_columns = pandas_metadata['index_columns']
+ table = _add_any_metadata(table, pandas_metadata)
+
+ block_table = table
for name in index_columns:
i = schema.get_field_index(name)
@@ -293,13 +298,14 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
col = table.column(i)
index_name = (None if is_unnamed_index_level(name)
else name)
- values = col.to_pandas().values
+ col_pandas = col.to_pandas()
+ values = col_pandas.values
if not values.flags.writeable:
# ARROW-1054: in pandas 0.19.2, factorize will reject
# non-writeable arrays when calling MultiIndex.from_arrays
values = values.copy()
- index_arrays.append(values)
+ index_arrays.append(pd.Series(values, dtype=col_pandas.dtype))
index_names.append(index_name)
block_table = block_table.remove_column(
block_table.schema.get_field_index(name)
@@ -319,7 +325,7 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
klass=_int.CategoricalBlock,
fastpath=True)
elif 'timezone' in item:
- dtype = DatetimeTZDtype('ns', tz=item['timezone'])
+ dtype = make_datetimetz(item['timezone'])
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype, fastpath=True)
@@ -340,3 +346,34 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
]
return _int.BlockManager(blocks, axes)
+
+
+def _add_any_metadata(table, pandas_metadata):
+ modified_columns = {}
+
+ schema = table.schema
+
+ # Add time zones
+ for i, col_meta in enumerate(pandas_metadata['columns']):
+ if col_meta['pandas_type'] == 'datetimetz':
+ col = table[i]
+ converted = col.to_pandas()
+ tz = col_meta['metadata']['timezone']
+ tz_aware_type = pa.timestamp('ns', tz=tz)
+ with_metadata = pa.Array.from_pandas(converted.values,
+ type=tz_aware_type)
+
+ field = pa.field(schema[i].name, tz_aware_type)
+ modified_columns[i] = pa.Column.from_array(field,
+ with_metadata)
+
+ if len(modified_columns) > 0:
+ columns = []
+ for i in range(len(table.schema)):
+ if i in modified_columns:
+ columns.append(modified_columns[i])
+ else:
+ columns.append(table[i])
+ return pa.Table.from_arrays(columns)
+ else:
+ return table
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/scalar.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/scalar.pxi b/python/pyarrow/scalar.pxi
index 16d2bad..3a847f7 100644
--- a/python/pyarrow/scalar.pxi
+++ b/python/pyarrow/scalar.pxi
@@ -212,11 +212,18 @@ else:
cdef class TimestampValue(ArrayValue):
+ property value:
+
+ def __get__(self):
+ cdef CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
+ cdef CTimestampType* dtype = <CTimestampType*> ap.type().get()
+ return ap.Value(self.index)
+
def as_py(self):
- cdef:
- CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
- CTimestampType* dtype = <CTimestampType*> ap.type().get()
- int64_t value = ap.Value(self.index)
+ cdef CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
+ cdef CTimestampType* dtype = <CTimestampType*> ap.type().get()
+
+ value = self.value
if not dtype.timezone().empty():
import pytz
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index dd3359e..245371f 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -150,6 +150,8 @@ cdef class Column:
if isinstance(field_or_name, Field):
boxed_field = field_or_name
+ if arr.type != boxed_field.type:
+ raise ValueError('Passed field type does not match array')
else:
boxed_field = field(field_or_name, arr.type)
@@ -176,7 +178,15 @@ cdef class Column:
self.sp_column,
self, &out))
- return pd.Series(wrap_array_output(out), name=self.name)
+ values = wrap_array_output(out)
+ result = pd.Series(values, name=self.name)
+
+ if isinstance(self.type, TimestampType):
+ if self.type.tz is not None:
+ result = (result.dt.tz_localize('utc')
+ .dt.tz_convert(self.type.tz))
+
+ return result
def equals(self, Column other):
"""
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index f5107c2..52290d6 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -81,6 +81,16 @@ class TestPandasConversion(unittest.TestCase):
expected = df
tm.assert_frame_equal(result, expected, check_dtype=check_dtype)
+ def _check_series_roundtrip(self, s, type_=None):
+ arr = pa.Array.from_pandas(s, type=type_)
+
+ result = pd.Series(arr.to_pandas(), name=s.name)
+ if isinstance(arr.type, pa.TimestampType) and arr.type.tz is not None:
+ result = (result.dt.tz_localize('utc')
+ .dt.tz_convert(arr.type.tz))
+
+ tm.assert_series_equal(s, result)
+
def _check_array_roundtrip(self, values, expected=None, mask=None,
timestamps_to_ms=False, type=None):
arr = pa.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms,
@@ -347,9 +357,7 @@ class TestPandasConversion(unittest.TestCase):
field = pa.field('datetime64', pa.timestamp('ns'))
schema = pa.schema([field])
self._check_pandas_roundtrip(
- df,
- timestamps_to_ms=False,
- expected_schema=schema,
+ df, expected_schema=schema,
)
def test_timestamps_to_ms_explicit_schema(self):
@@ -389,9 +397,7 @@ class TestPandasConversion(unittest.TestCase):
field = pa.field('datetime64', pa.timestamp('ns'))
schema = pa.schema([field])
self._check_pandas_roundtrip(
- df,
- timestamps_to_ms=False,
- expected_schema=schema,
+ df, expected_schema=schema,
)
def test_timestamps_with_timezone(self):
@@ -406,6 +412,8 @@ class TestPandasConversion(unittest.TestCase):
.to_frame())
self._check_pandas_roundtrip(df, timestamps_to_ms=True)
+ self._check_series_roundtrip(df['datetime64'])
+
# drop-in a null and ns instead of ms
df = pd.DataFrame({
'datetime64': np.array([
@@ -417,7 +425,15 @@ class TestPandasConversion(unittest.TestCase):
})
df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern')
.to_frame())
- self._check_pandas_roundtrip(df, timestamps_to_ms=False)
+ self._check_pandas_roundtrip(df)
+
+ def test_timestamp_with_tz_to_pandas_type(self):
+ from pyarrow.compat import DatetimeTZDtype
+
+ tz = 'America/Los_Angeles'
+ t = pa.timestamp('ns', tz=tz)
+
+ assert t.to_pandas_dtype() == DatetimeTZDtype('ns', tz=tz)
def test_date_infer(self):
df = pd.DataFrame({
@@ -586,8 +602,7 @@ class TestPandasConversion(unittest.TestCase):
def test_threaded_conversion(self):
df = _alltypes_example()
- self._check_pandas_roundtrip(df, nthreads=2,
- timestamps_to_ms=False)
+ self._check_pandas_roundtrip(df, nthreads=2)
def test_category(self):
repeats = 5
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index ae5c28f..5dfe0a5 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -22,7 +22,7 @@ import os
import json
import pytest
-from pyarrow.compat import guid, u
+from pyarrow.compat import guid, u, BytesIO
from pyarrow.filesystem import LocalFileSystem
import pyarrow as pa
from .pandas_examples import dataframe_with_arrays, dataframe_with_lists
@@ -115,6 +115,33 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
@parquet
+def test_pandas_parquet_datetime_tz():
+ import pyarrow.parquet as pq
+
+ s = pd.Series([datetime.datetime(2017, 9, 6)])
+ s = s.dt.tz_localize('utc')
+
+ s.index = s
+
+ # Both a column and an index to hit both use cases
+ df = pd.DataFrame({'tz_aware': s,
+ 'tz_eastern': s.dt.tz_convert('US/Eastern')},
+ index=s)
+
+ f = BytesIO()
+
+ arrow_table = pa.Table.from_pandas(df)
+
+ _write_table(arrow_table, f, coerce_timestamps='ms')
+ f.seek(0)
+
+ table_read = pq.read_pandas(f)
+
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@parquet
def test_pandas_parquet_custom_metadata(tmpdir):
import pyarrow.parquet as pq
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/tests/test_serialization.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index 8afcf0f..27243b0 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -100,9 +100,9 @@ PRIMITIVE_OBJECTS = [
if sys.version_info >= (3, 0):
PRIMITIVE_OBJECTS += [0, np.array([["hi", u"hi"], [1.3, 1]])]
else:
- PRIMITIVE_OBJECTS += [long(42), long(1 << 62), long(0),
+ PRIMITIVE_OBJECTS += [long(42), long(1 << 62), long(0), # noqa
np.array([["hi", u"hi"],
- [1.3, long(1)]])] # noqa: E501,F821
+ [1.3, long(1)]])] # noqa
COMPLEX_OBJECTS = [
http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/types.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index fb6b961..3eaee6c 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -134,6 +134,16 @@ cdef class TimestampType(DataType):
else:
return None
+ def to_pandas_dtype(self):
+ """
+ Return the NumPy dtype that would be used for storing this
+ """
+ if self.tz is None:
+ return _pandas_type_map[_Type_TIMESTAMP]
+ else:
+ # Return DatetimeTZ
+ return pdcompat.make_datetimetz(self.tz)
+
cdef class Time32Type(DataType):
@@ -431,7 +441,13 @@ cdef class Schema:
with nogil:
check_status(PrettyPrint(deref(self.schema), options, &result))
- return frombytes(result)
+ printed = frombytes(result)
+ if self.metadata is not None:
+ import pprint
+ metadata_formatted = pprint.pformat(self.metadata)
+ printed += '\nmetadata\n--------\n' + metadata_formatted
+
+ return printed
def __repr__(self):
return self.__str__()