You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/09/07 02:52:18 UTC

arrow git commit: ARROW-1435: [Python] Properly handle time zone metadata in Parquet round trips

Repository: arrow
Updated Branches:
  refs/heads/master 3033eac10 -> 6e5f7be10


ARROW-1435: [Python] Properly handle time zone metadata in Parquet round trips

cc @jreback. Various bugs fixed here, but the bottom line is that this enables tz-aware pandas data to be faithfully round-tripped to Parquet format. We will need to implement compatibility tests in pandas for this, too

example DataFrame that could not be properly written before:

```python
s = pd.Series([datetime.datetime(2017, 9, 6)])
s = s.dt.tz_localize('utc')
s.index = s
# Both a column and an index to hit both use cases
df = pd.DataFrame({'tz_aware': s}, index=s)
```

Author: Wes McKinney <we...@twosigma.com>

Closes #1054 from wesm/ARROW-1435 and squashes the following commits:

6519945f [Wes McKinney] Add test for a non-UTC time zone too
20bb6dc1 [Wes McKinney] Get round trip for tz-aware index to Parquet working. Handle time zones in Column.to_pandas
f92abaa7 [Wes McKinney] Fix, initial test passing
6701cf0e [Wes McKinney] Initial cut at fixing tz aware columns to/from Parquet


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6e5f7be1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6e5f7be1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6e5f7be1

Branch: refs/heads/master
Commit: 6e5f7be1088641958163dfe59e06245bd08b22a6
Parents: 3033eac
Author: Wes McKinney <we...@twosigma.com>
Authored: Wed Sep 6 22:52:13 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed Sep 6 22:52:13 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/python/pandas_to_arrow.cc     |  6 ++-
 python/pyarrow/__init__.py                  |  3 +-
 python/pyarrow/pandas_compat.py             | 53 ++++++++++++++++++++----
 python/pyarrow/scalar.pxi                   | 15 +++++--
 python/pyarrow/table.pxi                    | 12 +++++-
 python/pyarrow/tests/test_convert_pandas.py | 33 +++++++++++----
 python/pyarrow/tests/test_parquet.py        | 29 ++++++++++++-
 python/pyarrow/tests/test_serialization.py  |  4 +-
 python/pyarrow/types.pxi                    | 18 +++++++-
 9 files changed, 144 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/cpp/src/arrow/python/pandas_to_arrow.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/pandas_to_arrow.cc b/cpp/src/arrow/python/pandas_to_arrow.cc
index 8410381..2493779 100644
--- a/cpp/src/arrow/python/pandas_to_arrow.cc
+++ b/cpp/src/arrow/python/pandas_to_arrow.cc
@@ -347,8 +347,9 @@ class PandasConverter {
     }
 
     BufferVector buffers = {null_bitmap_, data};
-    return PushArray(
-        std::make_shared<ArrayData>(type_, length_, std::move(buffers), null_count, 0));
+    auto arr_data = std::make_shared<ArrayData>(type_, length_, std::move(buffers),
+                                                null_count, 0);
+    return PushArray(arr_data);
   }
 
   template <typename T>
@@ -1158,6 +1159,7 @@ Status PandasToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo,
   PandasConverter converter(pool, ao, mo, type);
   RETURN_NOT_OK(converter.Convert());
   *out = converter.result()[0];
+  DCHECK(*out);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 2b6c9fe..0d76a35 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -65,7 +65,8 @@ from pyarrow.lib import (null, bool_,
                          FloatValue, DoubleValue, ListValue,
                          BinaryValue, StringValue, FixedSizeBinaryValue,
                          DecimalValue,
-                         Date32Value, Date64Value, TimestampValue)
+                         Date32Value, Date64Value, TimestampValue,
+                         TimestampType)
 
 from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
                          FixedSizeBufferWriter,

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/pandas_compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 434b1c9..d1e6f5a 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -172,8 +172,8 @@ def construct_metadata(df, column_names, index_levels, preserve_index, types):
     dict
     """
     ncolumns = len(column_names)
-    df_types = types[:ncolumns]
-    index_types = types[ncolumns:ncolumns + len(index_levels)]
+    df_types = types[:ncolumns - len(index_levels)]
+    index_types = types[ncolumns - len(index_levels):]
 
     column_metadata = [
         get_column_metadata(df[col_name], name=sanitized_name,
@@ -269,13 +269,15 @@ def maybe_coerce_datetime64(values, dtype, type_, timestamps_to_ms=False):
     return values, type_
 
 
+def make_datetimetz(tz):
+    from pyarrow.compat import DatetimeTZDtype
+    return DatetimeTZDtype('ns', tz=tz)
+
+
 def table_to_blockmanager(options, table, memory_pool, nthreads=1):
     import pandas.core.internals as _int
-    from pyarrow.compat import DatetimeTZDtype
     import pyarrow.lib as lib
 
-    block_table = table
-
     index_columns = []
     index_arrays = []
     index_names = []
@@ -286,6 +288,9 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
     if metadata is not None and b'pandas' in metadata:
         pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
         index_columns = pandas_metadata['index_columns']
+        table = _add_any_metadata(table, pandas_metadata)
+
+    block_table = table
 
     for name in index_columns:
         i = schema.get_field_index(name)
@@ -293,13 +298,14 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
             col = table.column(i)
             index_name = (None if is_unnamed_index_level(name)
                           else name)
-            values = col.to_pandas().values
+            col_pandas = col.to_pandas()
+            values = col_pandas.values
             if not values.flags.writeable:
                 # ARROW-1054: in pandas 0.19.2, factorize will reject
                 # non-writeable arrays when calling MultiIndex.from_arrays
                 values = values.copy()
 
-            index_arrays.append(values)
+            index_arrays.append(pd.Series(values, dtype=col_pandas.dtype))
             index_names.append(index_name)
             block_table = block_table.remove_column(
                 block_table.schema.get_field_index(name)
@@ -319,7 +325,7 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
                                     klass=_int.CategoricalBlock,
                                     fastpath=True)
         elif 'timezone' in item:
-            dtype = DatetimeTZDtype('ns', tz=item['timezone'])
+            dtype = make_datetimetz(item['timezone'])
             block = _int.make_block(block_arr, placement=placement,
                                     klass=_int.DatetimeTZBlock,
                                     dtype=dtype, fastpath=True)
@@ -340,3 +346,34 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
     ]
 
     return _int.BlockManager(blocks, axes)
+
+
+def _add_any_metadata(table, pandas_metadata):
+    modified_columns = {}
+
+    schema = table.schema
+
+    # Add time zones
+    for i, col_meta in enumerate(pandas_metadata['columns']):
+        if col_meta['pandas_type'] == 'datetimetz':
+            col = table[i]
+            converted = col.to_pandas()
+            tz = col_meta['metadata']['timezone']
+            tz_aware_type = pa.timestamp('ns', tz=tz)
+            with_metadata = pa.Array.from_pandas(converted.values,
+                                                 type=tz_aware_type)
+
+            field = pa.field(schema[i].name, tz_aware_type)
+            modified_columns[i] = pa.Column.from_array(field,
+                                                       with_metadata)
+
+    if len(modified_columns) > 0:
+        columns = []
+        for i in range(len(table.schema)):
+            if i in modified_columns:
+                columns.append(modified_columns[i])
+            else:
+                columns.append(table[i])
+        return pa.Table.from_arrays(columns)
+    else:
+        return table

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/scalar.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/scalar.pxi b/python/pyarrow/scalar.pxi
index 16d2bad..3a847f7 100644
--- a/python/pyarrow/scalar.pxi
+++ b/python/pyarrow/scalar.pxi
@@ -212,11 +212,18 @@ else:
 
 cdef class TimestampValue(ArrayValue):
 
+    property value:
+
+        def __get__(self):
+            cdef CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
+            cdef CTimestampType* dtype = <CTimestampType*> ap.type().get()
+            return ap.Value(self.index)
+
     def as_py(self):
-        cdef:
-            CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
-            CTimestampType* dtype = <CTimestampType*> ap.type().get()
-            int64_t value = ap.Value(self.index)
+        cdef CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
+        cdef CTimestampType* dtype = <CTimestampType*> ap.type().get()
+
+        value = self.value
 
         if not dtype.timezone().empty():
             import pytz

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index dd3359e..245371f 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -150,6 +150,8 @@ cdef class Column:
 
         if isinstance(field_or_name, Field):
             boxed_field = field_or_name
+            if arr.type != boxed_field.type:
+                raise ValueError('Passed field type does not match array')
         else:
             boxed_field = field(field_or_name, arr.type)
 
@@ -176,7 +178,15 @@ cdef class Column:
                                                         self.sp_column,
                                                         self, &out))
 
-        return pd.Series(wrap_array_output(out), name=self.name)
+        values = wrap_array_output(out)
+        result = pd.Series(values, name=self.name)
+
+        if isinstance(self.type, TimestampType):
+            if self.type.tz is not None:
+                result = (result.dt.tz_localize('utc')
+                          .dt.tz_convert(self.type.tz))
+
+        return result
 
     def equals(self, Column other):
         """

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index f5107c2..52290d6 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -81,6 +81,16 @@ class TestPandasConversion(unittest.TestCase):
             expected = df
         tm.assert_frame_equal(result, expected, check_dtype=check_dtype)
 
+    def _check_series_roundtrip(self, s, type_=None):
+        arr = pa.Array.from_pandas(s, type=type_)
+
+        result = pd.Series(arr.to_pandas(), name=s.name)
+        if isinstance(arr.type, pa.TimestampType) and arr.type.tz is not None:
+            result = (result.dt.tz_localize('utc')
+                      .dt.tz_convert(arr.type.tz))
+
+        tm.assert_series_equal(s, result)
+
     def _check_array_roundtrip(self, values, expected=None, mask=None,
                                timestamps_to_ms=False, type=None):
         arr = pa.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms,
@@ -347,9 +357,7 @@ class TestPandasConversion(unittest.TestCase):
         field = pa.field('datetime64', pa.timestamp('ns'))
         schema = pa.schema([field])
         self._check_pandas_roundtrip(
-            df,
-            timestamps_to_ms=False,
-            expected_schema=schema,
+            df, expected_schema=schema,
         )
 
     def test_timestamps_to_ms_explicit_schema(self):
@@ -389,9 +397,7 @@ class TestPandasConversion(unittest.TestCase):
         field = pa.field('datetime64', pa.timestamp('ns'))
         schema = pa.schema([field])
         self._check_pandas_roundtrip(
-            df,
-            timestamps_to_ms=False,
-            expected_schema=schema,
+            df, expected_schema=schema,
         )
 
     def test_timestamps_with_timezone(self):
@@ -406,6 +412,8 @@ class TestPandasConversion(unittest.TestCase):
                             .to_frame())
         self._check_pandas_roundtrip(df, timestamps_to_ms=True)
 
+        self._check_series_roundtrip(df['datetime64'])
+
         # drop-in a null and ns instead of ms
         df = pd.DataFrame({
             'datetime64': np.array([
@@ -417,7 +425,15 @@ class TestPandasConversion(unittest.TestCase):
         })
         df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern')
                             .to_frame())
-        self._check_pandas_roundtrip(df, timestamps_to_ms=False)
+        self._check_pandas_roundtrip(df)
+
+    def test_timestamp_with_tz_to_pandas_type(self):
+        from pyarrow.compat import DatetimeTZDtype
+
+        tz = 'America/Los_Angeles'
+        t = pa.timestamp('ns', tz=tz)
+
+        assert t.to_pandas_dtype() == DatetimeTZDtype('ns', tz=tz)
 
     def test_date_infer(self):
         df = pd.DataFrame({
@@ -586,8 +602,7 @@ class TestPandasConversion(unittest.TestCase):
 
     def test_threaded_conversion(self):
         df = _alltypes_example()
-        self._check_pandas_roundtrip(df, nthreads=2,
-                                     timestamps_to_ms=False)
+        self._check_pandas_roundtrip(df, nthreads=2)
 
     def test_category(self):
         repeats = 5

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index ae5c28f..5dfe0a5 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -22,7 +22,7 @@ import os
 import json
 import pytest
 
-from pyarrow.compat import guid, u
+from pyarrow.compat import guid, u, BytesIO
 from pyarrow.filesystem import LocalFileSystem
 import pyarrow as pa
 from .pandas_examples import dataframe_with_arrays, dataframe_with_lists
@@ -115,6 +115,33 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
 
 
 @parquet
+def test_pandas_parquet_datetime_tz():
+    import pyarrow.parquet as pq
+
+    s = pd.Series([datetime.datetime(2017, 9, 6)])
+    s = s.dt.tz_localize('utc')
+
+    s.index = s
+
+    # Both a column and an index to hit both use cases
+    df = pd.DataFrame({'tz_aware': s,
+                       'tz_eastern': s.dt.tz_convert('US/Eastern')},
+                      index=s)
+
+    f = BytesIO()
+
+    arrow_table = pa.Table.from_pandas(df)
+
+    _write_table(arrow_table, f, coerce_timestamps='ms')
+    f.seek(0)
+
+    table_read = pq.read_pandas(f)
+
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+
+@parquet
 def test_pandas_parquet_custom_metadata(tmpdir):
     import pyarrow.parquet as pq
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/tests/test_serialization.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index 8afcf0f..27243b0 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -100,9 +100,9 @@ PRIMITIVE_OBJECTS = [
 if sys.version_info >= (3, 0):
     PRIMITIVE_OBJECTS += [0, np.array([["hi", u"hi"], [1.3, 1]])]
 else:
-    PRIMITIVE_OBJECTS += [long(42), long(1 << 62), long(0),
+    PRIMITIVE_OBJECTS += [long(42), long(1 << 62), long(0),  # noqa
                           np.array([["hi", u"hi"],
-                          [1.3, long(1)]])]  # noqa: E501,F821
+                          [1.3, long(1)]])]  # noqa
 
 
 COMPLEX_OBJECTS = [

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e5f7be1/python/pyarrow/types.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index fb6b961..3eaee6c 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -134,6 +134,16 @@ cdef class TimestampType(DataType):
             else:
                 return None
 
+    def to_pandas_dtype(self):
+        """
+        Return the NumPy dtype that would be used for storing this
+        """
+        if self.tz is None:
+            return _pandas_type_map[_Type_TIMESTAMP]
+        else:
+            # Return DatetimeTZ
+            return pdcompat.make_datetimetz(self.tz)
+
 
 cdef class Time32Type(DataType):
 
@@ -431,7 +441,13 @@ cdef class Schema:
         with nogil:
             check_status(PrettyPrint(deref(self.schema), options, &result))
 
-        return frombytes(result)
+        printed = frombytes(result)
+        if self.metadata is not None:
+            import pprint
+            metadata_formatted = pprint.pformat(self.metadata)
+            printed += '\nmetadata\n--------\n' + metadata_formatted
+
+        return printed
 
     def __repr__(self):
         return self.__str__()