You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/20 07:16:15 UTC
(arrow) branch main updated: GH-37242: [Python][Parquet] Parquet Support write and validate Page CRC (#38360)
This is an automated email from the ASF dual-hosted git repository.
alenka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 68ba49d501 GH-37242: [Python][Parquet] Parquet Support write and validate Page CRC (#38360)
68ba49d501 is described below
commit 68ba49d5018afd25dd0516b942d7c438d51b691e
Author: frazar <91...@users.noreply.github.com>
AuthorDate: Mon Nov 20 08:16:05 2023 +0100
GH-37242: [Python][Parquet] Parquet Support write and validate Page CRC (#38360)
### Rationale for this change
The C++ Parquet API already supports enabling CRC checksum for read and write operations.
CRC checksum are optional and can detect data corruption due to, for example, file storage issues or [cosmic rays](https://en.wikipedia.org/wiki/Soft_error).
It would then be beneficial to expose this optional functionality to the Python API too.
This PR is based on a previous PR which became stale: #37439
### What changes are included in this PR?
The PyArrow interface is expanded to include a `page_checksum_enabled` flag.
### Are these changes tested?
[ ] NOT YET!
### Are there any user-facing changes?
The change is backward compatible. An additional, optional keyword argument is added to some interfaces.
Closes #37242
Supersedes #37439
* Closes: #37242
Lead-authored-by: Francesco Zardi <fr...@hotmail.it>
Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
Co-authored-by: mwish <ma...@gmail.com>
Co-authored-by: Alenka Frim <Al...@users.noreply.github.com>
Signed-off-by: AlenkaF <fr...@gmail.com>
---
cpp/src/arrow/dataset/file_parquet.cc | 4 +
python/pyarrow/_dataset_parquet.pyx | 22 +++-
python/pyarrow/_parquet.pxd | 8 +-
python/pyarrow/_parquet.pyx | 22 +++-
python/pyarrow/parquet/core.py | 37 +++++-
python/pyarrow/tests/parquet/test_basic.py | 132 +++++++++++++++++++++
.../pyarrow/tests/parquet/test_parquet_writer.py | 4 +-
python/pyarrow/tests/test_dataset.py | 80 +++++++++++++
8 files changed, 295 insertions(+), 14 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 65ad70181f..3afe4ec85c 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -98,6 +98,10 @@ parquet::ReaderProperties MakeReaderProperties(
parquet_scan_options->reader_properties->thrift_string_size_limit());
properties.set_thrift_container_size_limit(
parquet_scan_options->reader_properties->thrift_container_size_limit());
+
+ properties.set_page_checksum_verification(
+ parquet_scan_options->reader_properties->page_checksum_verification());
+
return properties;
}
diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx
index 31aa058706..bd4151624d 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -606,6 +606,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
write_batch_size=self._properties["write_batch_size"],
dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"],
write_page_index=self._properties["write_page_index"],
+ write_page_checksum=self._properties["write_page_checksum"],
)
def _set_arrow_properties(self):
@@ -655,6 +656,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
dictionary_pagesize_limit=None,
write_page_index=False,
encryption_config=None,
+ write_page_checksum=False,
)
self._set_properties()
@@ -701,6 +703,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None
If not None, use the provided ParquetDecryptionConfig to decrypt the
Parquet file.
+ page_checksum_verification : bool, default False
+ If True, verify the page checksum for each page read from the file.
"""
# Avoid mistakingly creating attributes
@@ -711,7 +715,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
bint pre_buffer=True,
thrift_string_size_limit=None,
thrift_container_size_limit=None,
- decryption_config=None):
+ decryption_config=None,
+ bint page_checksum_verification=False):
self.init(shared_ptr[CFragmentScanOptions](
new CParquetFragmentScanOptions()))
self.use_buffered_stream = use_buffered_stream
@@ -723,6 +728,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
self.thrift_container_size_limit = thrift_container_size_limit
if decryption_config is not None:
self.parquet_decryption_config = decryption_config
+ self.page_checksum_verification = page_checksum_verification
cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
@@ -802,6 +808,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
set_decryption_config(self, config)
self._parquet_decryption_config = config
+ @property
+ def page_checksum_verification(self):
+ return self.reader_properties().page_checksum_verification()
+
+ @page_checksum_verification.setter
+ def page_checksum_verification(self, bint page_checksum_verification):
+ self.reader_properties().set_page_checksum_verification(page_checksum_verification)
+
def equals(self, ParquetFragmentScanOptions other):
"""
Parameters
@@ -814,11 +828,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
"""
attrs = (
self.use_buffered_stream, self.buffer_size, self.pre_buffer,
- self.thrift_string_size_limit, self.thrift_container_size_limit)
+ self.thrift_string_size_limit, self.thrift_container_size_limit,
+ self.page_checksum_verification)
other_attrs = (
other.use_buffered_stream, other.buffer_size, other.pre_buffer,
other.thrift_string_size_limit,
- other.thrift_container_size_limit)
+ other.thrift_container_size_limit, other.page_checksum_verification)
return attrs == other_attrs
@staticmethod
@@ -835,6 +850,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
pre_buffer=self.pre_buffer,
thrift_string_size_limit=self.thrift_string_size_limit,
thrift_container_size_limit=self.thrift_container_size_limit,
+ page_checksum_verification=self.page_checksum_verification
)
return ParquetFragmentScanOptions._reconstruct, (kwargs,)
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 39cdcc063b..59b50ceda8 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -380,6 +380,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
shared_ptr[CFileDecryptionProperties] file_decryption_properties() \
const
+ c_bool page_checksum_verification() const
+ void set_page_checksum_verification(c_bool check_crc)
+
CReaderProperties default_reader_properties()
cdef cppclass ArrowReaderProperties:
@@ -428,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit)
Builder* enable_write_page_index()
Builder* disable_write_page_index()
+ Builder* enable_page_checksum()
+ Builder* disable_page_checksum()
shared_ptr[WriterProperties] build()
cdef cppclass ArrowWriterProperties:
@@ -576,7 +581,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
FileEncryptionProperties encryption_properties=*,
write_batch_size=*,
dictionary_pagesize_limit=*,
- write_page_index=*) except *
+ write_page_index=*,
+ write_page_checksum=*) except *
cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 089ed7c75c..737ba9d0a8 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1183,7 +1183,8 @@ cdef class ParquetReader(_Weakrefable):
coerce_int96_timestamp_unit=None,
FileDecryptionProperties decryption_properties=None,
thrift_string_size_limit=None,
- thrift_container_size_limit=None):
+ thrift_container_size_limit=None,
+ page_checksum_verification=False):
"""
Open a parquet file for reading.
@@ -1199,6 +1200,7 @@ cdef class ParquetReader(_Weakrefable):
decryption_properties : FileDecryptionProperties, optional
thrift_string_size_limit : int, optional
thrift_container_size_limit : int, optional
+ page_checksum_verification : bool, default False
"""
cdef:
shared_ptr[CFileMetaData] c_metadata
@@ -1236,6 +1238,8 @@ cdef class ParquetReader(_Weakrefable):
arrow_props.set_pre_buffer(pre_buffer)
+ properties.set_page_checksum_verification(page_checksum_verification)
+
if coerce_int96_timestamp_unit is None:
# use the default defined in default_arrow_reader_properties()
pass
@@ -1559,7 +1563,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
FileEncryptionProperties encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
- write_page_index=False) except *:
+ write_page_index=False,
+ write_page_checksum=False) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
@@ -1703,6 +1708,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
# a size larger than this then it will be latched to this value.
props.max_row_group_length(_MAX_ROW_GROUP_SIZE)
+ # checksum
+
+ if write_page_checksum:
+ props.enable_page_checksum()
+ else:
+ props.disable_page_checksum()
+
# page index
if write_page_index:
@@ -1822,7 +1834,8 @@ cdef class ParquetWriter(_Weakrefable):
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
- write_page_index=False):
+ write_page_index=False,
+ write_page_checksum=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
@@ -1853,7 +1866,8 @@ cdef class ParquetWriter(_Weakrefable):
encryption_properties=encryption_properties,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
- write_page_index=write_page_index
+ write_page_index=write_page_index,
+ write_page_checksum=write_page_checksum
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index 072ab7fa11..096e960384 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -280,6 +280,8 @@ class ParquetFile:
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.
+ page_checksum_verification : bool, default False
+ If True, verify the checksum for each page read from the file.
Examples
--------
@@ -327,7 +329,8 @@ class ParquetFile:
read_dictionary=None, memory_map=False, buffer_size=0,
pre_buffer=False, coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
- thrift_container_size_limit=None, filesystem=None):
+ thrift_container_size_limit=None, filesystem=None,
+ page_checksum_verification=False):
self._close_source = getattr(source, 'closed', True)
@@ -346,6 +349,7 @@ class ParquetFile:
decryption_properties=decryption_properties,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
+ page_checksum_verification=page_checksum_verification,
)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
@@ -887,6 +891,10 @@ write_page_index : bool, default False
filtering more efficient than the page header, as it gathers all the
statistics for a Parquet file in a single place, avoiding scattered I/O.
Note that the page index is not yet used on the read size by PyArrow.
+write_page_checksum : bool, default False
+ Whether to write page checksums in general for all columns.
+ Page checksums enable detection of data corruption, which might occur during
+ transmission or in the storage.
"""
_parquet_writer_example_doc = """\
@@ -980,6 +988,7 @@ Examples
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
+ write_page_checksum=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
@@ -1037,6 +1046,7 @@ Examples
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
+ write_page_checksum=write_page_checksum,
**options)
self.is_open = True
@@ -1766,6 +1776,8 @@ thrift_container_size_limit : int, default None
If not None, override the maximum total size of containers allocated
when decoding Thrift structures. The default limit should be
sufficient for most Parquet files.
+page_checksum_verification : bool, default False
+ If True, verify the page checksum for each page read from the file.
Examples
--------
@@ -1779,7 +1791,8 @@ Examples
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
thrift_string_size_limit=None,
- thrift_container_size_limit=None):
+ thrift_container_size_limit=None,
+ page_checksum_verification=False):
extra_msg = ""
if use_legacy_dataset is None:
@@ -1812,6 +1825,7 @@ Examples
metadata_nthreads=metadata_nthreads,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
+ page_checksum_verification=page_checksum_verification,
)
warnings.warn(
"Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
@@ -1828,7 +1842,8 @@ Examples
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
thrift_string_size_limit=None,
- thrift_container_size_limit=None):
+ thrift_container_size_limit=None,
+ page_checksum_verification=False):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
@@ -2419,6 +2434,7 @@ class _ParquetDatasetV2:
coerce_int96_timestamp_unit=None, schema=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None,
+ page_checksum_verification=False,
**kwargs):
import pyarrow.dataset as ds
@@ -2437,6 +2453,7 @@ class _ParquetDatasetV2:
"coerce_int96_timestamp_unit": coerce_int96_timestamp_unit,
"thrift_string_size_limit": thrift_string_size_limit,
"thrift_container_size_limit": thrift_container_size_limit,
+ "page_checksum_verification": page_checksum_verification,
}
if buffer_size:
read_options.update(use_buffered_stream=True,
@@ -2855,6 +2872,8 @@ thrift_container_size_limit : int, default None
If not None, override the maximum total size of containers allocated
when decoding Thrift structures. The default limit should be
sufficient for most Parquet files.
+page_checksum_verification : bool, default False
+ If True, verify the checksum for each page read from the file.
Returns
-------
@@ -2949,7 +2968,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
ignore_prefixes=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
- thrift_container_size_limit=None):
+ thrift_container_size_limit=None,
+ page_checksum_verification=False):
if not use_legacy_dataset:
if metadata is not None:
raise ValueError(
@@ -2973,6 +2993,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
+ page_checksum_verification=page_checksum_verification,
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
@@ -3004,6 +3025,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
decryption_properties=decryption_properties,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
+ page_checksum_verification=page_checksum_verification,
)
return dataset.read(columns=columns, use_threads=use_threads,
@@ -3020,6 +3042,11 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
"The 'ignore_prefixes' keyword is only supported when "
"use_legacy_dataset=False")
+ if page_checksum_verification:
+ raise ValueError(
+ "The 'page_checksum_verification' keyword is only supported when "
+ "use_legacy_dataset=False")
+
if schema is not None:
raise ValueError(
"The 'schema' argument is only supported when "
@@ -3101,6 +3128,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
+ write_page_checksum=False,
**kwargs):
# Implementor's note: when adding keywords here / updating defaults, also
# update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
@@ -3129,6 +3157,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
+ write_page_checksum=write_page_checksum,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py
index dd12a26616..26c52b1cc5 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -18,6 +18,7 @@
from collections import OrderedDict
import io
import warnings
+from shutil import copytree
import numpy as np
import pytest
@@ -882,3 +883,134 @@ def test_thrift_size_limits(tempdir):
assert got == table
got = pq.read_table(path)
assert got == table
+
+
+def test_page_checksum_verification_write_table(tempdir):
+ """Check that checksum verification works for datasets created with
+ pq.write_table()"""
+
+ # Write some sample data into a parquet file with page checksum enabled
+ original_path = tempdir / 'correct.parquet'
+ table_orig = pa.table({'a': [1, 2, 3, 4]})
+ pq.write_table(table_orig, original_path, write_page_checksum=True)
+
+ # Read file and verify that the data is correct
+ table_check = pq.read_table(original_path, page_checksum_verification=True)
+ assert table_orig == table_check
+
+ # Read the original file as binary and swap the 31-th and 36-th bytes. This
+ # should be equivalent to storing the following data:
+ # pa.table({'a': [1, 3, 2, 4]})
+ bin_data = bytearray(original_path.read_bytes())
+
+ # Swap two bytes to emulate corruption. Also, check that the two bytes are
+ # different, otherwise no corruption occurs
+ assert bin_data[31] != bin_data[36]
+ bin_data[31], bin_data[36] = bin_data[36], bin_data[31]
+
+ # Write the corrupted data to another parquet file
+ corrupted_path = tempdir / 'corrupted.parquet'
+ corrupted_path.write_bytes(bin_data)
+
+ # Case 1: Reading the corrupted file with read_table() and without page
+ # checksum verification succeeds but yields corrupted data
+ table_corrupt = pq.read_table(corrupted_path,
+ page_checksum_verification=False)
+ # The read should complete without error, but the table has different
+ # content than the original file!
+ assert table_corrupt != table_orig
+ assert table_corrupt == pa.table({'a': [1, 3, 2, 4]})
+
+ # Case 2: Reading the corrupted file with read_table() and with page
+ # checksum verification enabled raises an exception
+ with pytest.raises(OSError, match="CRC checksum verification"):
+ _ = pq.read_table(corrupted_path, page_checksum_verification=True)
+
+ # Case 3: Reading the corrupted file with ParquetFile.read() and without
+ # page checksum verification succeeds but yields corrupted data
+ corrupted_pq_file = pq.ParquetFile(corrupted_path,
+ page_checksum_verification=False)
+ table_corrupt2 = corrupted_pq_file.read()
+ assert table_corrupt2 != table_orig
+ assert table_corrupt2 == pa.table({'a': [1, 3, 2, 4]})
+
+ # Case 4: Reading the corrupted file with ParquetFile.read() and with page
+ # checksum verification enabled raises an exception
+ corrupted_pq_file = pq.ParquetFile(corrupted_path,
+ page_checksum_verification=True)
+ # Accessing the data should result in an error
+ with pytest.raises(OSError, match="CRC checksum verification"):
+ _ = corrupted_pq_file.read()
+
+ # Case 5: Check that enabling page checksum verification in combination
+ # with legacy dataset raises an exception
+ with pytest.raises(ValueError, match="page_checksum_verification"):
+ _ = pq.read_table(corrupted_path,
+ page_checksum_verification=True,
+ use_legacy_dataset=True)
+
+
+@pytest.mark.dataset
+@pytest.mark.parametrize(
+ "use_legacy_dataset",
+ [
+ False,
+ pytest.param(
+ True,
+ marks=pytest.mark.filterwarnings(
+ "ignore:Passing 'use_legacy_dataset=True':FutureWarning"
+ ),
+ ),
+ ],
+)
+def test_checksum_write_to_dataset(tempdir, use_legacy_dataset):
+ """Check that checksum verification works for datasets created with
+ pq.write_to_dataset"""
+
+ table_orig = pa.table({'a': [1, 2, 3, 4]})
+
+ # Write a sample dataset with page checksum enabled
+ original_dir_path = tempdir / 'correct_dir'
+ pq.write_to_dataset(table_orig,
+ original_dir_path,
+ write_page_checksum=True,
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Read file and verify that the data is correct
+ original_file_path_list = list(original_dir_path.iterdir())
+ assert len(original_file_path_list) == 1
+ original_path = original_file_path_list[0]
+ table_check = pq.read_table(original_path, page_checksum_verification=True)
+ assert table_orig == table_check
+
+ # Read the original file as binary and swap the 31-th and 36-th bytes. This
+ # should be equivalent to storing the following data:
+ # pa.table({'a': [1, 3, 2, 4]})
+ bin_data = bytearray(original_path.read_bytes())
+
+ # Swap two bytes to emulate corruption. Also, check that the two bytes are
+ # different, otherwise no corruption occurs
+ assert bin_data[31] != bin_data[36]
+ bin_data[31], bin_data[36] = bin_data[36], bin_data[31]
+
+ # Write the corrupted data to another parquet dataset
+ # Copy dataset dir (which should be just one file)
+ corrupted_dir_path = tempdir / 'corrupted_dir'
+ copytree(original_dir_path, corrupted_dir_path)
+ # Corrupt just the one file with the dataset
+ corrupted_file_path = corrupted_dir_path / original_path.name
+ corrupted_file_path.write_bytes(bin_data)
+
+ # Case 1: Reading the corrupted file with read_table() and without page
+ # checksum verification succeeds but yields corrupted data
+ table_corrupt = pq.read_table(corrupted_file_path,
+ page_checksum_verification=False)
+ # The read should complete without error, but the table has different
+ # content than the original file!
+ assert table_corrupt != table_orig
+ assert table_corrupt == pa.table({'a': [1, 3, 2, 4]})
+
+ # Case 2: Reading the corrupted file with read_table() and with page
+ # checksum verification enabled raises an exception
+ with pytest.raises(OSError, match="CRC checksum verification"):
+ _ = pq.read_table(corrupted_file_path, page_checksum_verification=True)
diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py
index 5e6895c8dc..b902541015 100644
--- a/python/pyarrow/tests/parquet/test_parquet_writer.py
+++ b/python/pyarrow/tests/parquet/test_parquet_writer.py
@@ -94,14 +94,14 @@ def test_validate_schema_write_table(tempdir):
w.write_table(simple_table)
-def test_parquet_invalid_writer():
+def test_parquet_invalid_writer(tempdir):
# avoid segfaults with invalid construction
with pytest.raises(TypeError):
some_schema = pa.schema([pa.field("x", pa.int32())])
pq.ParquetWriter(None, some_schema)
with pytest.raises(TypeError):
- pq.ParquetWriter("some_path", None)
+ pq.ParquetWriter(tempdir / "some_path", None)
@pytest.mark.pandas
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 6f3b54b0cd..c6967326b3 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -25,6 +25,7 @@ import textwrap
import tempfile
import threading
import time
+from shutil import copytree
from urllib.parse import quote
@@ -788,12 +789,15 @@ def test_parquet_scan_options():
opts5 = ds.ParquetFragmentScanOptions(
thrift_string_size_limit=123456,
thrift_container_size_limit=987654,)
+ opts6 = ds.ParquetFragmentScanOptions(
+ page_checksum_verification=True)
assert opts1.use_buffered_stream is False
assert opts1.buffer_size == 2**13
assert opts1.pre_buffer is True
assert opts1.thrift_string_size_limit == 100_000_000 # default in C++
assert opts1.thrift_container_size_limit == 1_000_000 # default in C++
+ assert opts1.page_checksum_verification is False
assert opts2.use_buffered_stream is False
assert opts2.buffer_size == 2**12
@@ -810,11 +814,14 @@ def test_parquet_scan_options():
assert opts5.thrift_string_size_limit == 123456
assert opts5.thrift_container_size_limit == 987654
+ assert opts6.page_checksum_verification is True
+
assert opts1 == opts1
assert opts1 != opts2
assert opts2 != opts3
assert opts3 != opts4
assert opts5 != opts1
+ assert opts6 != opts1
def test_file_format_pickling(pickle_module):
@@ -5376,3 +5383,76 @@ def test_dataset_sort_by(tempdir, dstype):
sorted_tab_dict = sorted_tab.to_table().to_pydict()
assert sorted_tab_dict["a"] == [5, 7, 7, 35]
assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"]
+
+
+def test_checksum_write_dataset_read_dataset_to_table(tempdir):
+ """Check that checksum verification works for datasets created with
+ ds.write_dataset and read with ds.dataset.to_table"""
+
+ table_orig = pa.table({'a': [1, 2, 3, 4]})
+
+ # Write a sample dataset with page checksum enabled
+ pq_write_format = pa.dataset.ParquetFileFormat()
+ write_options = pq_write_format.make_write_options(
+ write_page_checksum=True)
+
+ original_dir_path = tempdir / 'correct_dir'
+ ds.write_dataset(
+ data=table_orig,
+ base_dir=original_dir_path,
+ format=pq_write_format,
+ file_options=write_options,
+ )
+
+ # Open dataset and verify that the data is correct
+ pq_scan_opts_crc = ds.ParquetFragmentScanOptions(
+ page_checksum_verification=True)
+ pq_read_format_crc = pa.dataset.ParquetFileFormat(
+ default_fragment_scan_options=pq_scan_opts_crc)
+ table_check = ds.dataset(
+ original_dir_path,
+ format=pq_read_format_crc
+ ).to_table()
+ assert table_orig == table_check
+
+ # Copy dataset dir (which should be just one file)
+ corrupted_dir_path = tempdir / 'corrupted_dir'
+ copytree(original_dir_path, corrupted_dir_path)
+
+ # Read the only file in the path as binary and swap the 31-th and 36-th
+ # bytes. This should be equivalent to storing the following data:
+ # pa.table({'a': [1, 3, 2, 4]})
+ corrupted_file_path_list = list(corrupted_dir_path.iterdir())
+ assert len(corrupted_file_path_list) == 1
+ corrupted_file_path = corrupted_file_path_list[0]
+ bin_data = bytearray(corrupted_file_path.read_bytes())
+
+ # Swap two bytes to emulate corruption. Also, check that the two bytes are
+ # different, otherwise no corruption occurs
+ assert bin_data[31] != bin_data[36]
+ bin_data[31], bin_data[36] = bin_data[36], bin_data[31]
+
+ # Write the corrupted data to the parquet file
+ corrupted_file_path.write_bytes(bin_data)
+
+ # Case 1: Reading the corrupted file with dataset().to_table() and without
+ # page checksum verification succeeds but yields corrupted data
+ pq_scan_opts_no_crc = ds.ParquetFragmentScanOptions(
+ page_checksum_verification=False)
+ pq_read_format_no_crc = pa.dataset.ParquetFileFormat(
+ default_fragment_scan_options=pq_scan_opts_no_crc)
+ table_corrupt = ds.dataset(
+ corrupted_dir_path, format=pq_read_format_no_crc).to_table()
+
+ # The read should complete without error, but the table has different
+ # content than the original file!
+ assert table_corrupt != table_orig
+ assert table_corrupt == pa.table({'a': [1, 3, 2, 4]})
+
+ # Case 2: Reading the corrupted file with read_table() and with page
+ # checksum verification enabled raises an exception
+ with pytest.raises(OSError, match="CRC checksum verification"):
+ _ = ds.dataset(
+ corrupted_dir_path,
+ format=pq_read_format_crc
+ ).to_table()