You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2020/05/13 01:15:27 UTC
[arrow] 11/17: ARROW-8657: [C++][Python] Add separate configuration
for data pages
This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch maint-0.17.x
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 3a61e9c4018860d893e5d7429c951b96adb81381
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Tue May 5 17:14:42 2020 -0500
ARROW-8657: [C++][Python] Add separate configuration for data pages
- Adds a separate write config to determine which version of
data page to use.
- Plumb this throught to python.
- At the moment version and data page version are completely
independent.
Closes #7089 from emkornfield/ARROW-8657
Lead-authored-by: Micah Kornfield <em...@gmail.com>
Co-authored-by: Wes McKinney <we...@apache.org>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/parquet/column_writer.cc | 16 +++++++-------
cpp/src/parquet/properties.h | 42 +++++++++++++++++++++++++++++++-----
cpp/src/parquet/properties_test.cc | 5 ++++-
python/pyarrow/_parquet.pxd | 9 ++++++++
python/pyarrow/_parquet.pyx | 17 ++++++++++++++-
python/pyarrow/parquet.py | 18 +++++++++++++++-
python/pyarrow/tests/test_parquet.py | 27 ++++++++++++++---------
7 files changed, 108 insertions(+), 26 deletions(-)
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index dbb7df2..e37beba 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -729,24 +729,24 @@ void ColumnWriterImpl::AddDataPage() {
int64_t repetition_levels_rle_size = 0;
std::shared_ptr<Buffer> values = GetValuesBuffer();
- bool is_v1 = properties_->version() == ParquetVersion::PARQUET_1_0;
+ bool is_v1_data_page = properties_->data_page_version() == ParquetDataPageVersion::V1;
if (descr_->max_definition_level() > 0) {
- definition_levels_rle_size =
- RleEncodeLevels(definition_levels_sink_.data(), definition_levels_rle_.get(),
- descr_->max_definition_level(), /*include_length_prefix=*/is_v1);
+ definition_levels_rle_size = RleEncodeLevels(
+ definition_levels_sink_.data(), definition_levels_rle_.get(),
+ descr_->max_definition_level(), /*include_length_prefix=*/is_v1_data_page);
}
if (descr_->max_repetition_level() > 0) {
- repetition_levels_rle_size =
- RleEncodeLevels(repetition_levels_sink_.data(), repetition_levels_rle_.get(),
- descr_->max_repetition_level(), /*include_length_prefix=*/is_v1);
+ repetition_levels_rle_size = RleEncodeLevels(
+ repetition_levels_sink_.data(), repetition_levels_rle_.get(),
+ descr_->max_repetition_level(), /*include_length_prefix=*/is_v1_data_page);
}
int64_t uncompressed_size =
definition_levels_rle_size + repetition_levels_rle_size + values->size();
- if (is_v1) {
+ if (is_v1_data_page) {
BuildDataPageV1(definition_levels_rle_size, repetition_levels_rle_size,
uncompressed_size, values);
} else {
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 9c2ec1d..df4fb41 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -34,10 +34,30 @@
namespace parquet {
+/// Determines use of Parquet Format version >= 2.0.0 logical types. For
+/// example, when writing from Arrow data structures, PARQUET_2_0 will enable
+/// use of INT_* and UINT_* converted types as well as nanosecond timestamps
+/// stored physically as INT64. Since some Parquet implementations do not
+/// support the logical types added in the 2.0.0 format version, if you want to
+/// maximize compatibility of your files you may want to use PARQUET_1_0.
+///
+/// Note that the 2.x format version series also introduced new serialized
+/// data page metadata and on disk data page layout. To enable this, use
+/// ParquetDataPageVersion.
struct ParquetVersion {
enum type { PARQUET_1_0, PARQUET_2_0 };
};
+/// Controls serialization format of data pages. parquet-format v2.0.0
+/// introduced a new data page metadata type DataPageV2 and serialized page
+/// structure (for example, encoded levels are no longer compressed). Prior to
+/// the completion of PARQUET-457 in 2020, this library did not implement
+/// DataPageV2 correctly, so if you use the V2 data page format, you may have
+/// forward compatibility issues (older versions of the library will be unable
+/// to read the files). Note that some Parquet implementations do not implement
+/// DataPageV2 at all.
+enum class ParquetDataPageVersion { V1, V2 };
+
static int64_t DEFAULT_BUFFER_SIZE = 1024;
static bool DEFAULT_USE_BUFFERED_STREAM = false;
@@ -89,8 +109,6 @@ static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
-static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
- ParquetVersion::PARQUET_1_0;
static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
@@ -159,7 +177,8 @@ class PARQUET_EXPORT WriterProperties {
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
pagesize_(kDefaultDataPageSize),
- version_(DEFAULT_WRITER_VERSION),
+ version_(ParquetVersion::PARQUET_1_0),
+ data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY) {}
virtual ~Builder() {}
@@ -216,6 +235,11 @@ class PARQUET_EXPORT WriterProperties {
return this;
}
+ Builder* data_page_version(ParquetDataPageVersion data_page_version) {
+ data_page_version_ = data_page_version;
+ return this;
+ }
+
Builder* version(ParquetVersion::type version) {
version_ = version;
return this;
@@ -394,7 +418,7 @@ class PARQUET_EXPORT WriterProperties {
return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
pagesize_, version_, created_by_, std::move(file_encryption_properties_),
- default_column_properties_, column_properties));
+ default_column_properties_, column_properties, data_page_version_));
}
private:
@@ -404,6 +428,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t max_row_group_length_;
int64_t pagesize_;
ParquetVersion::type version_;
+ ParquetDataPageVersion data_page_version_;
std::string created_by_;
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
@@ -427,6 +452,10 @@ class PARQUET_EXPORT WriterProperties {
inline int64_t data_pagesize() const { return pagesize_; }
+ inline ParquetDataPageVersion data_page_version() const {
+ return parquet_data_page_version_;
+ }
+
inline ParquetVersion::type version() const { return parquet_version_; }
inline std::string created_by() const { return parquet_created_by_; }
@@ -498,12 +527,14 @@ class PARQUET_EXPORT WriterProperties {
const std::string& created_by,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
- const std::unordered_map<std::string, ColumnProperties>& column_properties)
+ const std::unordered_map<std::string, ColumnProperties>& column_properties,
+ ParquetDataPageVersion data_page_version)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
max_row_group_length_(max_row_group_length),
pagesize_(pagesize),
+ parquet_data_page_version_(data_page_version),
parquet_version_(version),
parquet_created_by_(created_by),
file_encryption_properties_(file_encryption_properties),
@@ -515,6 +546,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t write_batch_size_;
int64_t max_row_group_length_;
int64_t pagesize_;
+ ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;
std::string parquet_created_by_;
diff --git a/cpp/src/parquet/properties_test.cc b/cpp/src/parquet/properties_test.cc
index 94ff79a..aef563b 100644
--- a/cpp/src/parquet/properties_test.cc
+++ b/cpp/src/parquet/properties_test.cc
@@ -43,7 +43,8 @@ TEST(TestWriterProperties, Basics) {
ASSERT_EQ(kDefaultDataPageSize, props->data_pagesize());
ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
- ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
+ ASSERT_EQ(ParquetVersion::PARQUET_1_0, props->version());
+ ASSERT_EQ(ParquetDataPageVersion::V1, props->data_page_version());
}
TEST(TestWriterProperties, AdvancedHandling) {
@@ -53,6 +54,7 @@ TEST(TestWriterProperties, AdvancedHandling) {
builder.compression(Compression::SNAPPY);
builder.encoding(Encoding::DELTA_BINARY_PACKED);
builder.encoding("delta-length", Encoding::DELTA_LENGTH_BYTE_ARRAY);
+ builder.data_page_version(ParquetDataPageVersion::V2);
std::shared_ptr<WriterProperties> props = builder.build();
ASSERT_EQ(Compression::GZIP, props->compression(ColumnPath::FromDotString("gzip")));
@@ -63,6 +65,7 @@ TEST(TestWriterProperties, AdvancedHandling) {
props->encoding(ColumnPath::FromDotString("gzip")));
ASSERT_EQ(Encoding::DELTA_LENGTH_BYTE_ARRAY,
props->encoding(ColumnPath::FromDotString("delta-length")));
+ ASSERT_EQ(ParquetDataPageVersion::V2, props->data_page_version());
}
TEST(TestReaderProperties, GetStreamInsufficientData) {
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index e737113..2b370b3 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -347,6 +347,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
cdef cppclass WriterProperties:
cppclass Builder:
+ Builder* data_page_version(ParquetDataPageVersion version)
Builder* version(ParquetVersion version)
Builder* compression(ParquetCompression codec)
Builder* compression(const c_string& path,
@@ -443,6 +444,14 @@ cdef extern from "parquet/properties.h" namespace "parquet" nogil:
V1 "parquet::ArrowWriterProperties::V1",
V2 "parquet::ArrowWriterProperties::V2"
+ cdef cppclass ParquetDataPageVersion:
+ pass
+
+ cdef ParquetDataPageVersion ParquetDataPageVersion_V1 \
+ " parquet::ParquetDataPageVersion::V1"
+ cdef ParquetDataPageVersion ParquetDataPageVersion_V2 \
+ " parquet::ParquetDataPageVersion::V2"
+
cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
cdef cppclass FileWriter:
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index a8dbc0e..de9b23a 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1207,6 +1207,7 @@ cdef class ParquetWriter:
object allow_truncated_timestamps
object compression
object compression_level
+ object data_page_version
object version
object write_statistics
object writer_engine_version
@@ -1223,7 +1224,8 @@ cdef class ParquetWriter:
allow_truncated_timestamps=False,
compression_level=None,
use_byte_stream_split=False,
- writer_engine_version=None):
+ writer_engine_version=None,
+ data_page_version=None):
cdef:
shared_ptr[WriterProperties] properties
c_string c_where
@@ -1250,8 +1252,10 @@ cdef class ParquetWriter:
self.allow_truncated_timestamps = allow_truncated_timestamps
self.use_byte_stream_split = use_byte_stream_split
self.writer_engine_version = writer_engine_version
+ self.data_page_version = data_page_version
cdef WriterProperties.Builder properties_builder
+ self._set_data_page_version(&properties_builder)
self._set_version(&properties_builder)
self._set_compression_props(&properties_builder)
self._set_dictionary_props(&properties_builder)
@@ -1324,6 +1328,17 @@ cdef class ParquetWriter:
raise ValueError("Unsupported Parquet format version: {0}"
.format(self.version))
+ cdef int _set_data_page_version(self, WriterProperties.Builder* props) \
+ except -1:
+ if self.data_page_version is not None:
+ if self.data_page_version == "1.0":
+ props.data_page_version(ParquetDataPageVersion_V1)
+ elif self.data_page_version == "2.0":
+ props.data_page_version(ParquetDataPageVersion_V2)
+ else:
+ raise ValueError("Unsupported Parquet data page version: {0}"
+ .format(self.data_page_version))
+
cdef void _set_compression_props(self, WriterProperties.Builder* props) \
except *:
if isinstance(self.compression, basestring):
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index fcd7454..3158e8f 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -432,7 +432,15 @@ def _sanitize_table(table, new_schema, flavor):
_parquet_writer_arg_docs = """version : {"1.0", "2.0"}, default "1.0"
- The Parquet format version, defaults to 1.0.
+ Determine which Parquet logical types are available for use, whether the
+ reduced set from the Parquet 1.x.x format or the expanded logical types
+ added in format version 2.0.0 and after. Note that files written with
+ version='2.0' may not be readable in all Parquet implementations, so
+ version='1.0' is likely the choice that maximizes file compatibility. Some
+ features, such as lossless storage of nanosecond timestamps as INT64
+ physical storage, are only available with version='2.0'. The Parquet 2.0.0
+ format version also introduced a new serialized data page format; this can
+ be enabled separately using the data_page_version option.
use_dictionary : bool or list
Specify if we should use dictionary encoding in general or only for
some columns.
@@ -481,6 +489,10 @@ writer_engine_version: str, default "V2"
all nested types. V1 is legacy and will be removed in a future release.
Setting the environment variable ARROW_PARQUET_WRITER_ENGINE will
override the default.
+data_page_version : {"1.0", "2.0"}, default "1.0"
+ The serialized Parquet data page format version to write, defaults to
+ 1.0. This does not impact the file schema logical types and Arrow to
+ Parquet type casting behavior; for that use the "version" option.
"""
@@ -511,6 +523,7 @@ schema : arrow Schema
compression_level=None,
use_byte_stream_split=False,
writer_engine_version=None,
+ data_page_version='1.0',
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
@@ -549,6 +562,7 @@ schema : arrow Schema
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
writer_engine_version=engine_version,
+ data_page_version=data_page_version,
**options)
self.is_open = True
@@ -1584,6 +1598,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
filesystem=None,
compression_level=None,
use_byte_stream_split=False,
+ data_page_version='1.0',
**kwargs):
row_group_size = kwargs.pop('chunk_size', row_group_size)
use_int96 = use_deprecated_int96_timestamps
@@ -1602,6 +1617,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
use_deprecated_int96_timestamps=use_int96,
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
+ data_page_version=data_page_version,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index f215eaa..5e6e227 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -210,6 +210,10 @@ def test_parquet_invalid_version(tempdir):
table = pa.table({'a': [1, 2, 3]})
with pytest.raises(ValueError, match="Unsupported Parquet format version"):
_write_table(table, tempdir / 'test_version.parquet', version="2.2")
+ with pytest.raises(ValueError, match="Unsupported Parquet data page " +
+ "version"):
+ _write_table(table, tempdir / 'test_version.parquet',
+ data_page_version="2.2")
@parametrize_legacy_dataset
@@ -230,16 +234,19 @@ def test_chunked_table_write(use_legacy_dataset):
# ARROW-232
df = alltypes_sample(size=10)
- batch = pa.RecordBatch.from_pandas(df)
- table = pa.Table.from_batches([batch] * 3)
- _check_roundtrip(
- table, version='2.0', use_legacy_dataset=use_legacy_dataset)
+ for data_page_version in ['1.0', '2.0']:
+ batch = pa.RecordBatch.from_pandas(df)
+ table = pa.Table.from_batches([batch] * 3)
+ _check_roundtrip(
+ table, version='2.0', use_legacy_dataset=use_legacy_dataset,
+ data_page_version=data_page_version)
- df, _ = dataframe_with_lists()
- batch = pa.RecordBatch.from_pandas(df)
- table = pa.Table.from_batches([batch] * 3)
- _check_roundtrip(
- table, version='2.0', use_legacy_dataset=use_legacy_dataset)
+ df, _ = dataframe_with_lists()
+ batch = pa.RecordBatch.from_pandas(df)
+ table = pa.Table.from_batches([batch] * 3)
+ _check_roundtrip(
+ table, version='2.0', use_legacy_dataset=use_legacy_dataset,
+ data_page_version=data_page_version)
@pytest.mark.pandas
@@ -3738,7 +3745,7 @@ def test_multi_dataset_metadata(tempdir):
'one': [1, 2, 3],
'two': [-1, -2, -3],
'three': [[1, 2], [2, 3], [3, 4]],
- })
+ })
table = pa.Table.from_pandas(df)
# write dataset twice and collect/merge metadata