You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2020/05/13 01:15:27 UTC

[arrow] 11/17: ARROW-8657: [C++][Python] Add separate configuration for data pages

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-0.17.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 3a61e9c4018860d893e5d7429c951b96adb81381
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Tue May 5 17:14:42 2020 -0500

    ARROW-8657: [C++][Python] Add separate configuration for data pages
    
    - Adds a separate write config to determine which version of
      data page to use.
    - Plumb this throught to python.
    - At the moment version and data page version are completely
      independent.
    
    Closes #7089 from emkornfield/ARROW-8657
    
    Lead-authored-by: Micah Kornfield <em...@gmail.com>
    Co-authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/parquet/column_writer.cc     | 16 +++++++-------
 cpp/src/parquet/properties.h         | 42 +++++++++++++++++++++++++++++++-----
 cpp/src/parquet/properties_test.cc   |  5 ++++-
 python/pyarrow/_parquet.pxd          |  9 ++++++++
 python/pyarrow/_parquet.pyx          | 17 ++++++++++++++-
 python/pyarrow/parquet.py            | 18 +++++++++++++++-
 python/pyarrow/tests/test_parquet.py | 27 ++++++++++++++---------
 7 files changed, 108 insertions(+), 26 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index dbb7df2..e37beba 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -729,24 +729,24 @@ void ColumnWriterImpl::AddDataPage() {
   int64_t repetition_levels_rle_size = 0;
 
   std::shared_ptr<Buffer> values = GetValuesBuffer();
-  bool is_v1 = properties_->version() == ParquetVersion::PARQUET_1_0;
+  bool is_v1_data_page = properties_->data_page_version() == ParquetDataPageVersion::V1;
 
   if (descr_->max_definition_level() > 0) {
-    definition_levels_rle_size =
-        RleEncodeLevels(definition_levels_sink_.data(), definition_levels_rle_.get(),
-                        descr_->max_definition_level(), /*include_length_prefix=*/is_v1);
+    definition_levels_rle_size = RleEncodeLevels(
+        definition_levels_sink_.data(), definition_levels_rle_.get(),
+        descr_->max_definition_level(), /*include_length_prefix=*/is_v1_data_page);
   }
 
   if (descr_->max_repetition_level() > 0) {
-    repetition_levels_rle_size =
-        RleEncodeLevels(repetition_levels_sink_.data(), repetition_levels_rle_.get(),
-                        descr_->max_repetition_level(), /*include_length_prefix=*/is_v1);
+    repetition_levels_rle_size = RleEncodeLevels(
+        repetition_levels_sink_.data(), repetition_levels_rle_.get(),
+        descr_->max_repetition_level(), /*include_length_prefix=*/is_v1_data_page);
   }
 
   int64_t uncompressed_size =
       definition_levels_rle_size + repetition_levels_rle_size + values->size();
 
-  if (is_v1) {
+  if (is_v1_data_page) {
     BuildDataPageV1(definition_levels_rle_size, repetition_levels_rle_size,
                     uncompressed_size, values);
   } else {
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 9c2ec1d..df4fb41 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -34,10 +34,30 @@
 
 namespace parquet {
 
+/// Determines use of Parquet Format version >= 2.0.0 logical types. For
+/// example, when writing from Arrow data structures, PARQUET_2_0 will enable
+/// use of INT_* and UINT_* converted types as well as nanosecond timestamps
+/// stored physically as INT64. Since some Parquet implementations do not
+/// support the logical types added in the 2.0.0 format version, if you want to
+/// maximize compatibility of your files you may want to use PARQUET_1_0.
+///
+/// Note that the 2.x format version series also introduced new serialized
+/// data page metadata and on disk data page layout. To enable this, use
+/// ParquetDataPageVersion.
 struct ParquetVersion {
   enum type { PARQUET_1_0, PARQUET_2_0 };
 };
 
+/// Controls serialization format of data pages.  parquet-format v2.0.0
+/// introduced a new data page metadata type DataPageV2 and serialized page
+/// structure (for example, encoded levels are no longer compressed). Prior to
+/// the completion of PARQUET-457 in 2020, this library did not implement
+/// DataPageV2 correctly, so if you use the V2 data page format, you may have
+/// forward compatibility issues (older versions of the library will be unable
+/// to read the files). Note that some Parquet implementations do not implement
+/// DataPageV2 at all.
+enum class ParquetDataPageVersion { V1, V2 };
+
 static int64_t DEFAULT_BUFFER_SIZE = 1024;
 static bool DEFAULT_USE_BUFFERED_STREAM = false;
 
@@ -89,8 +109,6 @@ static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
 static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
 static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
 static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
-static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
-    ParquetVersion::PARQUET_1_0;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
 
@@ -159,7 +177,8 @@ class PARQUET_EXPORT WriterProperties {
           write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
           max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
           pagesize_(kDefaultDataPageSize),
-          version_(DEFAULT_WRITER_VERSION),
+          version_(ParquetVersion::PARQUET_1_0),
+          data_page_version_(ParquetDataPageVersion::V1),
           created_by_(DEFAULT_CREATED_BY) {}
     virtual ~Builder() {}
 
@@ -216,6 +235,11 @@ class PARQUET_EXPORT WriterProperties {
       return this;
     }
 
+    Builder* data_page_version(ParquetDataPageVersion data_page_version) {
+      data_page_version_ = data_page_version;
+      return this;
+    }
+
     Builder* version(ParquetVersion::type version) {
       version_ = version;
       return this;
@@ -394,7 +418,7 @@ class PARQUET_EXPORT WriterProperties {
       return std::shared_ptr<WriterProperties>(new WriterProperties(
           pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
           pagesize_, version_, created_by_, std::move(file_encryption_properties_),
-          default_column_properties_, column_properties));
+          default_column_properties_, column_properties, data_page_version_));
     }
 
    private:
@@ -404,6 +428,7 @@ class PARQUET_EXPORT WriterProperties {
     int64_t max_row_group_length_;
     int64_t pagesize_;
     ParquetVersion::type version_;
+    ParquetDataPageVersion data_page_version_;
     std::string created_by_;
 
     std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
@@ -427,6 +452,10 @@ class PARQUET_EXPORT WriterProperties {
 
   inline int64_t data_pagesize() const { return pagesize_; }
 
+  inline ParquetDataPageVersion data_page_version() const {
+    return parquet_data_page_version_;
+  }
+
   inline ParquetVersion::type version() const { return parquet_version_; }
 
   inline std::string created_by() const { return parquet_created_by_; }
@@ -498,12 +527,14 @@ class PARQUET_EXPORT WriterProperties {
       const std::string& created_by,
       std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
       const ColumnProperties& default_column_properties,
-      const std::unordered_map<std::string, ColumnProperties>& column_properties)
+      const std::unordered_map<std::string, ColumnProperties>& column_properties,
+      ParquetDataPageVersion data_page_version)
       : pool_(pool),
         dictionary_pagesize_limit_(dictionary_pagesize_limit),
         write_batch_size_(write_batch_size),
         max_row_group_length_(max_row_group_length),
         pagesize_(pagesize),
+        parquet_data_page_version_(data_page_version),
         parquet_version_(version),
         parquet_created_by_(created_by),
         file_encryption_properties_(file_encryption_properties),
@@ -515,6 +546,7 @@ class PARQUET_EXPORT WriterProperties {
   int64_t write_batch_size_;
   int64_t max_row_group_length_;
   int64_t pagesize_;
+  ParquetDataPageVersion parquet_data_page_version_;
   ParquetVersion::type parquet_version_;
   std::string parquet_created_by_;
 
diff --git a/cpp/src/parquet/properties_test.cc b/cpp/src/parquet/properties_test.cc
index 94ff79a..aef563b 100644
--- a/cpp/src/parquet/properties_test.cc
+++ b/cpp/src/parquet/properties_test.cc
@@ -43,7 +43,8 @@ TEST(TestWriterProperties, Basics) {
 
   ASSERT_EQ(kDefaultDataPageSize, props->data_pagesize());
   ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
-  ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
+  ASSERT_EQ(ParquetVersion::PARQUET_1_0, props->version());
+  ASSERT_EQ(ParquetDataPageVersion::V1, props->data_page_version());
 }
 
 TEST(TestWriterProperties, AdvancedHandling) {
@@ -53,6 +54,7 @@ TEST(TestWriterProperties, AdvancedHandling) {
   builder.compression(Compression::SNAPPY);
   builder.encoding(Encoding::DELTA_BINARY_PACKED);
   builder.encoding("delta-length", Encoding::DELTA_LENGTH_BYTE_ARRAY);
+  builder.data_page_version(ParquetDataPageVersion::V2);
   std::shared_ptr<WriterProperties> props = builder.build();
 
   ASSERT_EQ(Compression::GZIP, props->compression(ColumnPath::FromDotString("gzip")));
@@ -63,6 +65,7 @@ TEST(TestWriterProperties, AdvancedHandling) {
             props->encoding(ColumnPath::FromDotString("gzip")));
   ASSERT_EQ(Encoding::DELTA_LENGTH_BYTE_ARRAY,
             props->encoding(ColumnPath::FromDotString("delta-length")));
+  ASSERT_EQ(ParquetDataPageVersion::V2, props->data_page_version());
 }
 
 TEST(TestReaderProperties, GetStreamInsufficientData) {
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index e737113..2b370b3 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -347,6 +347,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
 cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
     cdef cppclass WriterProperties:
         cppclass Builder:
+            Builder* data_page_version(ParquetDataPageVersion version)
             Builder* version(ParquetVersion version)
             Builder* compression(ParquetCompression codec)
             Builder* compression(const c_string& path,
@@ -443,6 +444,14 @@ cdef extern from "parquet/properties.h" namespace "parquet" nogil:
         V1 "parquet::ArrowWriterProperties::V1",
         V2 "parquet::ArrowWriterProperties::V2"
 
+    cdef cppclass ParquetDataPageVersion:
+        pass
+
+    cdef ParquetDataPageVersion ParquetDataPageVersion_V1 \
+        " parquet::ParquetDataPageVersion::V1"
+    cdef ParquetDataPageVersion ParquetDataPageVersion_V2 \
+        " parquet::ParquetDataPageVersion::V2"
+
 cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
     cdef cppclass FileWriter:
 
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index a8dbc0e..de9b23a 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1207,6 +1207,7 @@ cdef class ParquetWriter:
         object allow_truncated_timestamps
         object compression
         object compression_level
+        object data_page_version
         object version
         object write_statistics
         object writer_engine_version
@@ -1223,7 +1224,8 @@ cdef class ParquetWriter:
                   allow_truncated_timestamps=False,
                   compression_level=None,
                   use_byte_stream_split=False,
-                  writer_engine_version=None):
+                  writer_engine_version=None,
+                  data_page_version=None):
         cdef:
             shared_ptr[WriterProperties] properties
             c_string c_where
@@ -1250,8 +1252,10 @@ cdef class ParquetWriter:
         self.allow_truncated_timestamps = allow_truncated_timestamps
         self.use_byte_stream_split = use_byte_stream_split
         self.writer_engine_version = writer_engine_version
+        self.data_page_version = data_page_version
 
         cdef WriterProperties.Builder properties_builder
+        self._set_data_page_version(&properties_builder)
         self._set_version(&properties_builder)
         self._set_compression_props(&properties_builder)
         self._set_dictionary_props(&properties_builder)
@@ -1324,6 +1328,17 @@ cdef class ParquetWriter:
                 raise ValueError("Unsupported Parquet format version: {0}"
                                  .format(self.version))
 
+    cdef int _set_data_page_version(self, WriterProperties.Builder* props) \
+            except -1:
+        if self.data_page_version is not None:
+            if self.data_page_version == "1.0":
+                props.data_page_version(ParquetDataPageVersion_V1)
+            elif self.data_page_version == "2.0":
+                props.data_page_version(ParquetDataPageVersion_V2)
+            else:
+                raise ValueError("Unsupported Parquet data page version: {0}"
+                                 .format(self.data_page_version))
+
     cdef void _set_compression_props(self, WriterProperties.Builder* props) \
             except *:
         if isinstance(self.compression, basestring):
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index fcd7454..3158e8f 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -432,7 +432,15 @@ def _sanitize_table(table, new_schema, flavor):
 
 
 _parquet_writer_arg_docs = """version : {"1.0", "2.0"}, default "1.0"
-    The Parquet format version, defaults to 1.0.
+    Determine which Parquet logical types are available for use, whether the
+    reduced set from the Parquet 1.x.x format or the expanded logical types
+    added in format version 2.0.0 and after. Note that files written with
+    version='2.0' may not be readable in all Parquet implementations, so
+    version='1.0' is likely the choice that maximizes file compatibility. Some
+    features, such as lossless storage of nanosecond timestamps as INT64
+    physical storage, are only available with version='2.0'. The Parquet 2.0.0
+    format version also introduced a new serialized data page format; this can
+    be enabled separately using the data_page_version option.
 use_dictionary : bool or list
     Specify if we should use dictionary encoding in general or only for
     some columns.
@@ -481,6 +489,10 @@ writer_engine_version: str, default "V2"
     all nested types. V1 is legacy and will be removed in a future release.
     Setting the environment variable ARROW_PARQUET_WRITER_ENGINE will
     override the default.
+data_page_version : {"1.0", "2.0"}, default "1.0"
+    The serialized Parquet data page format version to write, defaults to
+    1.0. This does not impact the file schema logical types and Arrow to
+    Parquet type casting behavior; for that use the "version" option.
 """
 
 
@@ -511,6 +523,7 @@ schema : arrow Schema
                  compression_level=None,
                  use_byte_stream_split=False,
                  writer_engine_version=None,
+                 data_page_version='1.0',
                  **options):
         if use_deprecated_int96_timestamps is None:
             # Use int96 timestamps for Spark
@@ -549,6 +562,7 @@ schema : arrow Schema
             compression_level=compression_level,
             use_byte_stream_split=use_byte_stream_split,
             writer_engine_version=engine_version,
+            data_page_version=data_page_version,
             **options)
         self.is_open = True
 
@@ -1584,6 +1598,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
                 filesystem=None,
                 compression_level=None,
                 use_byte_stream_split=False,
+                data_page_version='1.0',
                 **kwargs):
     row_group_size = kwargs.pop('chunk_size', row_group_size)
     use_int96 = use_deprecated_int96_timestamps
@@ -1602,6 +1617,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
                 use_deprecated_int96_timestamps=use_int96,
                 compression_level=compression_level,
                 use_byte_stream_split=use_byte_stream_split,
+                data_page_version=data_page_version,
                 **kwargs) as writer:
             writer.write_table(table, row_group_size=row_group_size)
     except Exception:
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index f215eaa..5e6e227 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -210,6 +210,10 @@ def test_parquet_invalid_version(tempdir):
     table = pa.table({'a': [1, 2, 3]})
     with pytest.raises(ValueError, match="Unsupported Parquet format version"):
         _write_table(table, tempdir / 'test_version.parquet', version="2.2")
+    with pytest.raises(ValueError, match="Unsupported Parquet data page " +
+                       "version"):
+        _write_table(table, tempdir / 'test_version.parquet',
+                     data_page_version="2.2")
 
 
 @parametrize_legacy_dataset
@@ -230,16 +234,19 @@ def test_chunked_table_write(use_legacy_dataset):
     # ARROW-232
     df = alltypes_sample(size=10)
 
-    batch = pa.RecordBatch.from_pandas(df)
-    table = pa.Table.from_batches([batch] * 3)
-    _check_roundtrip(
-        table, version='2.0', use_legacy_dataset=use_legacy_dataset)
+    for data_page_version in ['1.0', '2.0']:
+        batch = pa.RecordBatch.from_pandas(df)
+        table = pa.Table.from_batches([batch] * 3)
+        _check_roundtrip(
+            table, version='2.0', use_legacy_dataset=use_legacy_dataset,
+            data_page_version=data_page_version)
 
-    df, _ = dataframe_with_lists()
-    batch = pa.RecordBatch.from_pandas(df)
-    table = pa.Table.from_batches([batch] * 3)
-    _check_roundtrip(
-        table, version='2.0', use_legacy_dataset=use_legacy_dataset)
+        df, _ = dataframe_with_lists()
+        batch = pa.RecordBatch.from_pandas(df)
+        table = pa.Table.from_batches([batch] * 3)
+        _check_roundtrip(
+            table, version='2.0', use_legacy_dataset=use_legacy_dataset,
+            data_page_version=data_page_version)
 
 
 @pytest.mark.pandas
@@ -3738,7 +3745,7 @@ def test_multi_dataset_metadata(tempdir):
         'one': [1, 2, 3],
         'two': [-1, -2, -3],
         'three': [[1, 2], [2, 3], [3, 4]],
-        })
+    })
     table = pa.Table.from_pandas(df)
 
     # write dataset twice and collect/merge metadata