You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/18 19:18:07 UTC

arrow git commit: ARROW-1120: Support for writing timestamp(ns) to Int96

Repository: arrow
Updated Branches:
  refs/heads/master 362e754b3 -> c5a89b7b4


ARROW-1120: Support for writing timestamp(ns) to Int96

cc @c-nichols

Author: Uwe L. Korn <uw...@xhochy.com>
Author: Colin Nichols <ni...@gmail.com>

Closes #865 from xhochy/ARROW-1120 and squashes the following commits:

ff70832f [Uwe L. Korn] Use integer division
99f825d3 [Uwe L. Korn] Add flag for timestamp[ns] roundtrips
7c28835b [Colin Nichols] ARROW-1120 Support for writing timestamp(ns) to Int96


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/c5a89b7b
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/c5a89b7b
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/c5a89b7b

Branch: refs/heads/master
Commit: c5a89b7b4fb94c3988677c5d80405ff7e9cfbd18
Parents: 362e754
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Tue Jul 18 15:18:03 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Jul 18 15:18:03 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/_parquet.pxd          |  9 +++++++++
 python/pyarrow/_parquet.pyx          | 17 ++++++++++++++--
 python/pyarrow/parquet.py            | 11 +++++++----
 python/pyarrow/tests/test_parquet.py | 33 +++++++++++++++++++++++++++----
 4 files changed, 60 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/c5a89b7b/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 3d2d0c8..b1cd5eb 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -247,8 +247,17 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
         CStatus Open(const CSchema& schema, CMemoryPool* pool,
                      const shared_ptr[OutputStream]& sink,
                      const shared_ptr[WriterProperties]& properties,
+                     const shared_ptr[ArrowWriterProperties]& arrow_properties,
                      unique_ptr[FileWriter]* writer)
 
         CStatus WriteTable(const CTable& table, int64_t chunk_size)
         CStatus NewRowGroup(int64_t chunk_size)
         CStatus Close()
+
+    cdef cppclass ArrowWriterProperties:
+        cppclass Builder:
+            Builder()
+            Builder* disable_deprecated_int96_timestamps()
+            Builder* enable_deprecated_int96_timestamps()
+            shared_ptr[ArrowWriterProperties] build()
+        c_bool support_deprecated_int96_timestamps()

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5a89b7b/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 0e0d58e..bbe5203 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -545,13 +545,14 @@ cdef class ParquetWriter:
 
     cdef readonly:
         object use_dictionary
+        object use_deprecated_int96_timestamps
         object compression
         object version
         int row_group_size
 
     def __cinit__(self, where, Schema schema, use_dictionary=None,
                   compression=None, version=None,
-                  MemoryPool memory_pool=None):
+                  MemoryPool memory_pool=None, use_deprecated_int96_timestamps=False):
         cdef:
             shared_ptr[FileOutputStream] filestream
             shared_ptr[OutputStream] sink
@@ -566,6 +567,7 @@ cdef class ParquetWriter:
         self.use_dictionary = use_dictionary
         self.compression = compression
         self.version = version
+        self.use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
 
         cdef WriterProperties.Builder properties_builder
         self._set_version(&properties_builder)
@@ -573,10 +575,21 @@ cdef class ParquetWriter:
         self._set_dictionary_props(&properties_builder)
         properties = properties_builder.build()
 
+        cdef ArrowWriterProperties.Builder arrow_properties_builder
+        self._set_int96_support(&arrow_properties_builder)
+        arrow_properties = arrow_properties_builder.build()
+
         check_status(
             FileWriter.Open(deref(schema.schema),
                             maybe_unbox_memory_pool(memory_pool),
-                            sink, properties, &self.writer))
+                            sink, properties, arrow_properties,
+                            &self.writer))
+
+    cdef void _set_int96_support(self, ArrowWriterProperties.Builder* props):
+        if self.use_deprecated_int96_timestamps:
+            props.enable_deprecated_int96_timestamps()
+        else:
+            props.disable_deprecated_int96_timestamps()
 
     cdef void _set_version(self, WriterProperties.Builder* props):
         if self.version is not None:

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5a89b7b/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 06b3a3d..64cf330 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -743,7 +743,8 @@ def read_pandas(source, columns=None, nthreads=1, metadata=None):
 
 
 def write_table(table, where, row_group_size=None, version='1.0',
-                use_dictionary=True, compression='snappy', **kwargs):
+                use_dictionary=True, compression='snappy',
+                use_deprecated_int96_timestamps=False, **kwargs):
     """
     Write a Table to Parquet format
 
@@ -766,12 +767,13 @@ def write_table(table, where, row_group_size=None, version='1.0',
     writer = ParquetWriter(where, table.schema,
                            use_dictionary=use_dictionary,
                            compression=compression,
-                           version=version)
+                           version=version,
+                           use_deprecated_int96_timestamps=use_deprecated_int96_timestamps)
     writer.write_table(table, row_group_size=row_group_size)
     writer.close()
 
 
-def write_metadata(schema, where, version='1.0'):
+def write_metadata(schema, where, version='1.0', use_deprecated_int96_timestamps=False):
     """
     Write metadata-only Parquet file from schema
 
@@ -782,5 +784,6 @@ def write_metadata(schema, where, version='1.0'):
     version : {"1.0", "2.0"}, default "1.0"
         The Parquet format version, defaults to 1.0
     """
-    writer = ParquetWriter(where, schema, version=version)
+    writer = ParquetWriter(where, schema, version=version,
+                           use_deprecated_int96_timestamps=use_deprecated_int96_timestamps)
     writer.close()

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5a89b7b/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index d17eb14..40e44b3 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -456,20 +456,45 @@ def test_date_time_types():
     ex_t6 = pa.time32('ms')
     ex_a6 = pa.Array.from_pandas(data4 * 1000, type=ex_t6)
 
-    table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6],
+    t7 = pa.timestamp('ns')
+    start = pd.Timestamp('2001-01-01').value
+    data7 = np.array([start, start + 1, start + 2], dtype='int64')
+    a7 = pa.Array.from_pandas(data7, type=t7)
+
+    t7_us = pa.timestamp('us')
+    start = pd.Timestamp('2001-01-01').value
+    data7_us = np.array([start, start + 1, start + 2], dtype='int64') // 1000
+    a7_us = pa.Array.from_pandas(data7_us, type=t7_us)
+
+    table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7],
                                  ['date32', 'date64', 'timestamp[us]',
                                   'time32[s]', 'time64[us]',
-                                  'time32_from64[s]'])
+                                  'time32_from64[s]',
+                                  'timestamp[ns]'])
 
     # date64 as date32
     # time32[s] to time32[ms]
-    expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6],
+    # 'timestamp[ns]' to 'timestamp[us]'
+    expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7_us],
                                     ['date32', 'date64', 'timestamp[us]',
                                      'time32[s]', 'time64[us]',
-                                     'time32_from64[s]'])
+                                     'time32_from64[s]',
+                                     'timestamp[ns]'])
 
     _check_roundtrip(table, expected=expected, version='2.0')
 
+    # date64 as date32
+    # time32[s] to time32[ms]
+    # 'timestamp[ns]' is saved as INT96 timestamp
+    expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7],
+                                    ['date32', 'date64', 'timestamp[us]',
+                                     'time32[s]', 'time64[us]',
+                                     'time32_from64[s]',
+                                     'timestamp[ns]'])
+
+    _check_roundtrip(table, expected=expected, version='2.0',
+                     use_deprecated_int96_timestamps=True)
+
     # Unsupported stuff
     def _assert_unsupported(array):
         table = pa.Table.from_arrays([array], ['unsupported'])