You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/20 07:16:15 UTC

(arrow) branch main updated: GH-37242: [Python][Parquet] Parquet Support write and validate Page CRC (#38360)

This is an automated email from the ASF dual-hosted git repository.

alenka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 68ba49d501 GH-37242: [Python][Parquet] Parquet Support write and validate Page CRC (#38360)
68ba49d501 is described below

commit 68ba49d5018afd25dd0516b942d7c438d51b691e
Author: frazar <91...@users.noreply.github.com>
AuthorDate: Mon Nov 20 08:16:05 2023 +0100

    GH-37242: [Python][Parquet] Parquet Support write and validate Page CRC (#38360)
    
    ### Rationale for this change
    
    The C++ Parquet API already supports enabling CRC checksum for read and write operations.
    
    CRC checksum are optional and can detect data corruption due to, for example, file storage issues or [cosmic rays](https://en.wikipedia.org/wiki/Soft_error).
    
    It would then be beneficial to expose this optional functionality to the Python API too.
    
    This PR is based on a previous PR which became stale: #37439
    
    ### What changes are included in this PR?
    
    The PyArrow interface is expanded to include a `page_checksum_enabled` flag.
    
    ### Are these changes tested?
    
    [ ] NOT YET!
    
    ### Are there any user-facing changes?
    
    The change is backward compatible. An additional, optional keyword argument is added to some interfaces.
    
    Closes #37242
    Supersedes #37439
    * Closes: #37242
    
    Lead-authored-by: Francesco Zardi <fr...@hotmail.it>
    Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
    Co-authored-by: mwish <ma...@gmail.com>
    Co-authored-by: Alenka Frim <Al...@users.noreply.github.com>
    Signed-off-by: AlenkaF <fr...@gmail.com>
---
 cpp/src/arrow/dataset/file_parquet.cc              |   4 +
 python/pyarrow/_dataset_parquet.pyx                |  22 +++-
 python/pyarrow/_parquet.pxd                        |   8 +-
 python/pyarrow/_parquet.pyx                        |  22 +++-
 python/pyarrow/parquet/core.py                     |  37 +++++-
 python/pyarrow/tests/parquet/test_basic.py         | 132 +++++++++++++++++++++
 .../pyarrow/tests/parquet/test_parquet_writer.py   |   4 +-
 python/pyarrow/tests/test_dataset.py               |  80 +++++++++++++
 8 files changed, 295 insertions(+), 14 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 65ad70181f..3afe4ec85c 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -98,6 +98,10 @@ parquet::ReaderProperties MakeReaderProperties(
       parquet_scan_options->reader_properties->thrift_string_size_limit());
   properties.set_thrift_container_size_limit(
       parquet_scan_options->reader_properties->thrift_container_size_limit());
+
+  properties.set_page_checksum_verification(
+      parquet_scan_options->reader_properties->page_checksum_verification());
+
   return properties;
 }
 
diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx
index 31aa058706..bd4151624d 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -606,6 +606,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
             write_batch_size=self._properties["write_batch_size"],
             dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"],
             write_page_index=self._properties["write_page_index"],
+            write_page_checksum=self._properties["write_page_checksum"],
         )
 
     def _set_arrow_properties(self):
@@ -655,6 +656,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
             dictionary_pagesize_limit=None,
             write_page_index=False,
             encryption_config=None,
+            write_page_checksum=False,
         )
 
         self._set_properties()
@@ -701,6 +703,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
     decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None
         If not None, use the provided ParquetDecryptionConfig to decrypt the
         Parquet file.
+    page_checksum_verification : bool, default False
+        If True, verify the page checksum for each page read from the file.
     """
 
     # Avoid mistakingly creating attributes
@@ -711,7 +715,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
                  bint pre_buffer=True,
                  thrift_string_size_limit=None,
                  thrift_container_size_limit=None,
-                 decryption_config=None):
+                 decryption_config=None,
+                 bint page_checksum_verification=False):
         self.init(shared_ptr[CFragmentScanOptions](
             new CParquetFragmentScanOptions()))
         self.use_buffered_stream = use_buffered_stream
@@ -723,6 +728,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
             self.thrift_container_size_limit = thrift_container_size_limit
         if decryption_config is not None:
             self.parquet_decryption_config = decryption_config
+        self.page_checksum_verification = page_checksum_verification
 
     cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
         FragmentScanOptions.init(self, sp)
@@ -802,6 +808,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         set_decryption_config(self, config)
         self._parquet_decryption_config = config
 
+    @property
+    def page_checksum_verification(self):
+        return self.reader_properties().page_checksum_verification()
+
+    @page_checksum_verification.setter
+    def page_checksum_verification(self, bint page_checksum_verification):
+        self.reader_properties().set_page_checksum_verification(page_checksum_verification)
+
     def equals(self, ParquetFragmentScanOptions other):
         """
         Parameters
@@ -814,11 +828,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         """
         attrs = (
             self.use_buffered_stream, self.buffer_size, self.pre_buffer,
-            self.thrift_string_size_limit, self.thrift_container_size_limit)
+            self.thrift_string_size_limit, self.thrift_container_size_limit,
+            self.page_checksum_verification)
         other_attrs = (
             other.use_buffered_stream, other.buffer_size, other.pre_buffer,
             other.thrift_string_size_limit,
-            other.thrift_container_size_limit)
+            other.thrift_container_size_limit, other.page_checksum_verification)
         return attrs == other_attrs
 
     @staticmethod
@@ -835,6 +850,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
             pre_buffer=self.pre_buffer,
             thrift_string_size_limit=self.thrift_string_size_limit,
             thrift_container_size_limit=self.thrift_container_size_limit,
+            page_checksum_verification=self.page_checksum_verification
         )
         return ParquetFragmentScanOptions._reconstruct, (kwargs,)
 
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 39cdcc063b..59b50ceda8 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -380,6 +380,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
         shared_ptr[CFileDecryptionProperties] file_decryption_properties() \
             const
 
+        c_bool page_checksum_verification() const
+        void set_page_checksum_verification(c_bool check_crc)
+
     CReaderProperties default_reader_properties()
 
     cdef cppclass ArrowReaderProperties:
@@ -428,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
             Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit)
             Builder* enable_write_page_index()
             Builder* disable_write_page_index()
+            Builder* enable_page_checksum()
+            Builder* disable_page_checksum()
             shared_ptr[WriterProperties] build()
 
     cdef cppclass ArrowWriterProperties:
@@ -576,7 +581,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
     FileEncryptionProperties encryption_properties=*,
     write_batch_size=*,
     dictionary_pagesize_limit=*,
-    write_page_index=*) except *
+    write_page_index=*,
+    write_page_checksum=*) except *
 
 
 cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 089ed7c75c..737ba9d0a8 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1183,7 +1183,8 @@ cdef class ParquetReader(_Weakrefable):
              coerce_int96_timestamp_unit=None,
              FileDecryptionProperties decryption_properties=None,
              thrift_string_size_limit=None,
-             thrift_container_size_limit=None):
+             thrift_container_size_limit=None,
+             page_checksum_verification=False):
         """
         Open a parquet file for reading.
 
@@ -1199,6 +1200,7 @@ cdef class ParquetReader(_Weakrefable):
         decryption_properties : FileDecryptionProperties, optional
         thrift_string_size_limit : int, optional
         thrift_container_size_limit : int, optional
+        page_checksum_verification : bool, default False
         """
         cdef:
             shared_ptr[CFileMetaData] c_metadata
@@ -1236,6 +1238,8 @@ cdef class ParquetReader(_Weakrefable):
 
         arrow_props.set_pre_buffer(pre_buffer)
 
+        properties.set_page_checksum_verification(page_checksum_verification)
+
         if coerce_int96_timestamp_unit is None:
             # use the default defined in default_arrow_reader_properties()
             pass
@@ -1559,7 +1563,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
         FileEncryptionProperties encryption_properties=None,
         write_batch_size=None,
         dictionary_pagesize_limit=None,
-        write_page_index=False) except *:
+        write_page_index=False,
+        write_page_checksum=False) except *:
     """General writer properties"""
     cdef:
         shared_ptr[WriterProperties] properties
@@ -1703,6 +1708,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
     # a size larger than this then it will be latched to this value.
     props.max_row_group_length(_MAX_ROW_GROUP_SIZE)
 
+    # checksum
+
+    if write_page_checksum:
+        props.enable_page_checksum()
+    else:
+        props.disable_page_checksum()
+
     # page index
 
     if write_page_index:
@@ -1822,7 +1834,8 @@ cdef class ParquetWriter(_Weakrefable):
                   write_batch_size=None,
                   dictionary_pagesize_limit=None,
                   store_schema=True,
-                  write_page_index=False):
+                  write_page_index=False,
+                  write_page_checksum=False):
         cdef:
             shared_ptr[WriterProperties] properties
             shared_ptr[ArrowWriterProperties] arrow_properties
@@ -1853,7 +1866,8 @@ cdef class ParquetWriter(_Weakrefable):
             encryption_properties=encryption_properties,
             write_batch_size=write_batch_size,
             dictionary_pagesize_limit=dictionary_pagesize_limit,
-            write_page_index=write_page_index
+            write_page_index=write_page_index,
+            write_page_checksum=write_page_checksum
         )
         arrow_properties = _create_arrow_writer_properties(
             use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index 072ab7fa11..096e960384 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -280,6 +280,8 @@ class ParquetFile:
         If nothing passed, will be inferred based on path.
         Path will try to be found in the local on-disk filesystem otherwise
         it will be parsed as an URI to determine the filesystem.
+    page_checksum_verification : bool, default False
+        If True, verify the checksum for each page read from the file.
 
     Examples
     --------
@@ -327,7 +329,8 @@ class ParquetFile:
                  read_dictionary=None, memory_map=False, buffer_size=0,
                  pre_buffer=False, coerce_int96_timestamp_unit=None,
                  decryption_properties=None, thrift_string_size_limit=None,
-                 thrift_container_size_limit=None, filesystem=None):
+                 thrift_container_size_limit=None, filesystem=None,
+                 page_checksum_verification=False):
 
         self._close_source = getattr(source, 'closed', True)
 
@@ -346,6 +349,7 @@ class ParquetFile:
             decryption_properties=decryption_properties,
             thrift_string_size_limit=thrift_string_size_limit,
             thrift_container_size_limit=thrift_container_size_limit,
+            page_checksum_verification=page_checksum_verification,
         )
         self.common_metadata = common_metadata
         self._nested_paths_by_prefix = self._build_nested_paths()
@@ -887,6 +891,10 @@ write_page_index : bool, default False
     filtering more efficient than the page header, as it gathers all the
     statistics for a Parquet file in a single place, avoiding scattered I/O.
     Note that the page index is not yet used on the read size by PyArrow.
+write_page_checksum : bool, default False
+    Whether to write page checksums in general for all columns.
+    Page checksums enable detection of data corruption, which might occur during
+    transmission or in the storage.
 """
 
 _parquet_writer_example_doc = """\
@@ -980,6 +988,7 @@ Examples
                  dictionary_pagesize_limit=None,
                  store_schema=True,
                  write_page_index=False,
+                 write_page_checksum=False,
                  **options):
         if use_deprecated_int96_timestamps is None:
             # Use int96 timestamps for Spark
@@ -1037,6 +1046,7 @@ Examples
             dictionary_pagesize_limit=dictionary_pagesize_limit,
             store_schema=store_schema,
             write_page_index=write_page_index,
+            write_page_checksum=write_page_checksum,
             **options)
         self.is_open = True
 
@@ -1766,6 +1776,8 @@ thrift_container_size_limit : int, default None
     If not None, override the maximum total size of containers allocated
     when decoding Thrift structures. The default limit should be
     sufficient for most Parquet files.
+page_checksum_verification : bool, default False
+    If True, verify the page checksum for each page read from the file.
 
 Examples
 --------
@@ -1779,7 +1791,8 @@ Examples
                 use_legacy_dataset=None, pre_buffer=True,
                 coerce_int96_timestamp_unit=None,
                 thrift_string_size_limit=None,
-                thrift_container_size_limit=None):
+                thrift_container_size_limit=None,
+                page_checksum_verification=False):
 
         extra_msg = ""
         if use_legacy_dataset is None:
@@ -1812,6 +1825,7 @@ Examples
                 metadata_nthreads=metadata_nthreads,
                 thrift_string_size_limit=thrift_string_size_limit,
                 thrift_container_size_limit=thrift_container_size_limit,
+                page_checksum_verification=page_checksum_verification,
             )
         warnings.warn(
             "Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
@@ -1828,7 +1842,8 @@ Examples
                  use_legacy_dataset=None, pre_buffer=True,
                  coerce_int96_timestamp_unit=None,
                  thrift_string_size_limit=None,
-                 thrift_container_size_limit=None):
+                 thrift_container_size_limit=None,
+                 page_checksum_verification=False):
         if partitioning != "hive":
             raise ValueError(
                 'Only "hive" for hive-like partitioning is supported when '
@@ -2419,6 +2434,7 @@ class _ParquetDatasetV2:
                  coerce_int96_timestamp_unit=None, schema=None,
                  decryption_properties=None, thrift_string_size_limit=None,
                  thrift_container_size_limit=None,
+                 page_checksum_verification=False,
                  **kwargs):
         import pyarrow.dataset as ds
 
@@ -2437,6 +2453,7 @@ class _ParquetDatasetV2:
             "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit,
             "thrift_string_size_limit": thrift_string_size_limit,
             "thrift_container_size_limit": thrift_container_size_limit,
+            "page_checksum_verification": page_checksum_verification,
         }
         if buffer_size:
             read_options.update(use_buffered_stream=True,
@@ -2855,6 +2872,8 @@ thrift_container_size_limit : int, default None
     If not None, override the maximum total size of containers allocated
     when decoding Thrift structures. The default limit should be
     sufficient for most Parquet files.
+page_checksum_verification : bool, default False
+    If True, verify the checksum for each page read from the file.
 
 Returns
 -------
@@ -2949,7 +2968,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
                ignore_prefixes=None, pre_buffer=True,
                coerce_int96_timestamp_unit=None,
                decryption_properties=None, thrift_string_size_limit=None,
-               thrift_container_size_limit=None):
+               thrift_container_size_limit=None,
+               page_checksum_verification=False):
     if not use_legacy_dataset:
         if metadata is not None:
             raise ValueError(
@@ -2973,6 +2993,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
                 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
                 thrift_string_size_limit=thrift_string_size_limit,
                 thrift_container_size_limit=thrift_container_size_limit,
+                page_checksum_verification=page_checksum_verification,
             )
         except ImportError:
             # fall back on ParquetFile for simple cases when pyarrow.dataset
@@ -3004,6 +3025,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
                 decryption_properties=decryption_properties,
                 thrift_string_size_limit=thrift_string_size_limit,
                 thrift_container_size_limit=thrift_container_size_limit,
+                page_checksum_verification=page_checksum_verification,
             )
 
         return dataset.read(columns=columns, use_threads=use_threads,
@@ -3020,6 +3042,11 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
             "The 'ignore_prefixes' keyword is only supported when "
             "use_legacy_dataset=False")
 
+    if page_checksum_verification:
+        raise ValueError(
+            "The 'page_checksum_verification' keyword is only supported when "
+            "use_legacy_dataset=False")
+
     if schema is not None:
         raise ValueError(
             "The 'schema' argument is only supported when "
@@ -3101,6 +3128,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
                 dictionary_pagesize_limit=None,
                 store_schema=True,
                 write_page_index=False,
+                write_page_checksum=False,
                 **kwargs):
     # Implementor's note: when adding keywords here / updating defaults, also
     # update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
@@ -3129,6 +3157,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
                 dictionary_pagesize_limit=dictionary_pagesize_limit,
                 store_schema=store_schema,
                 write_page_index=write_page_index,
+                write_page_checksum=write_page_checksum,
                 **kwargs) as writer:
             writer.write_table(table, row_group_size=row_group_size)
     except Exception:
diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py
index dd12a26616..26c52b1cc5 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -18,6 +18,7 @@
 from collections import OrderedDict
 import io
 import warnings
+from shutil import copytree
 
 import numpy as np
 import pytest
@@ -882,3 +883,134 @@ def test_thrift_size_limits(tempdir):
     assert got == table
     got = pq.read_table(path)
     assert got == table
+
+
+def test_page_checksum_verification_write_table(tempdir):
+    """Check that checksum verification works for datasets created with
+    pq.write_table()"""
+
+    # Write some sample data into a parquet file with page checksum enabled
+    original_path = tempdir / 'correct.parquet'
+    table_orig = pa.table({'a': [1, 2, 3, 4]})
+    pq.write_table(table_orig, original_path, write_page_checksum=True)
+
+    # Read file and verify that the data is correct
+    table_check = pq.read_table(original_path, page_checksum_verification=True)
+    assert table_orig == table_check
+
+    # Read the original file as binary and swap the 31-th and 36-th bytes. This
+    # should be equivalent to storing the following data:
+    #    pa.table({'a': [1, 3, 2, 4]})
+    bin_data = bytearray(original_path.read_bytes())
+
+    # Swap two bytes to emulate corruption. Also, check that the two bytes are
+    # different, otherwise no corruption occurs
+    assert bin_data[31] != bin_data[36]
+    bin_data[31], bin_data[36] = bin_data[36], bin_data[31]
+
+    # Write the corrupted data to another parquet file
+    corrupted_path = tempdir / 'corrupted.parquet'
+    corrupted_path.write_bytes(bin_data)
+
+    # Case 1: Reading the corrupted file with read_table() and without page
+    # checksum verification succeeds but yields corrupted data
+    table_corrupt = pq.read_table(corrupted_path,
+                                  page_checksum_verification=False)
+    # The read should complete without error, but the table has different
+    # content than the original file!
+    assert table_corrupt != table_orig
+    assert table_corrupt == pa.table({'a': [1, 3, 2, 4]})
+
+    # Case 2: Reading the corrupted file with read_table() and with page
+    # checksum verification enabled raises an exception
+    with pytest.raises(OSError, match="CRC checksum verification"):
+        _ = pq.read_table(corrupted_path, page_checksum_verification=True)
+
+    # Case 3: Reading the corrupted file with ParquetFile.read() and without
+    # page checksum verification succeeds but yields corrupted data
+    corrupted_pq_file = pq.ParquetFile(corrupted_path,
+                                       page_checksum_verification=False)
+    table_corrupt2 = corrupted_pq_file.read()
+    assert table_corrupt2 != table_orig
+    assert table_corrupt2 == pa.table({'a': [1, 3, 2, 4]})
+
+    # Case 4: Reading the corrupted file with ParquetFile.read() and with page
+    # checksum verification enabled raises an exception
+    corrupted_pq_file = pq.ParquetFile(corrupted_path,
+                                       page_checksum_verification=True)
+    # Accessing the data should result in an error
+    with pytest.raises(OSError, match="CRC checksum verification"):
+        _ = corrupted_pq_file.read()
+
+    # Case 5: Check that enabling page checksum verification in combination
+    # with legacy dataset raises an exception
+    with pytest.raises(ValueError, match="page_checksum_verification"):
+        _ = pq.read_table(corrupted_path,
+                          page_checksum_verification=True,
+                          use_legacy_dataset=True)
+
+
+@pytest.mark.dataset
+@pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [
+        False,
+        pytest.param(
+            True,
+            marks=pytest.mark.filterwarnings(
+                "ignore:Passing 'use_legacy_dataset=True':FutureWarning"
+            ),
+        ),
+    ],
+)
+def test_checksum_write_to_dataset(tempdir, use_legacy_dataset):
+    """Check that checksum verification works for datasets created with
+    pq.write_to_dataset"""
+
+    table_orig = pa.table({'a': [1, 2, 3, 4]})
+
+    # Write a sample dataset with page checksum enabled
+    original_dir_path = tempdir / 'correct_dir'
+    pq.write_to_dataset(table_orig,
+                        original_dir_path,
+                        write_page_checksum=True,
+                        use_legacy_dataset=use_legacy_dataset)
+
+    # Read file and verify that the data is correct
+    original_file_path_list = list(original_dir_path.iterdir())
+    assert len(original_file_path_list) == 1
+    original_path = original_file_path_list[0]
+    table_check = pq.read_table(original_path, page_checksum_verification=True)
+    assert table_orig == table_check
+
+    # Read the original file as binary and swap the 31-th and 36-th bytes. This
+    # should be equivalent to storing the following data:
+    #    pa.table({'a': [1, 3, 2, 4]})
+    bin_data = bytearray(original_path.read_bytes())
+
+    # Swap two bytes to emulate corruption. Also, check that the two bytes are
+    # different, otherwise no corruption occurs
+    assert bin_data[31] != bin_data[36]
+    bin_data[31], bin_data[36] = bin_data[36], bin_data[31]
+
+    # Write the corrupted data to another parquet dataset
+    # Copy dataset dir (which should be just one file)
+    corrupted_dir_path = tempdir / 'corrupted_dir'
+    copytree(original_dir_path, corrupted_dir_path)
+    # Corrupt just the one file with the dataset
+    corrupted_file_path = corrupted_dir_path / original_path.name
+    corrupted_file_path.write_bytes(bin_data)
+
+    # Case 1: Reading the corrupted file with read_table() and without page
+    # checksum verification succeeds but yields corrupted data
+    table_corrupt = pq.read_table(corrupted_file_path,
+                                  page_checksum_verification=False)
+    # The read should complete without error, but the table has different
+    # content than the original file!
+    assert table_corrupt != table_orig
+    assert table_corrupt == pa.table({'a': [1, 3, 2, 4]})
+
+    # Case 2: Reading the corrupted file with read_table() and with page
+    # checksum verification enabled raises an exception
+    with pytest.raises(OSError, match="CRC checksum verification"):
+        _ = pq.read_table(corrupted_file_path, page_checksum_verification=True)
diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py
index 5e6895c8dc..b902541015 100644
--- a/python/pyarrow/tests/parquet/test_parquet_writer.py
+++ b/python/pyarrow/tests/parquet/test_parquet_writer.py
@@ -94,14 +94,14 @@ def test_validate_schema_write_table(tempdir):
             w.write_table(simple_table)
 
 
-def test_parquet_invalid_writer():
+def test_parquet_invalid_writer(tempdir):
     # avoid segfaults with invalid construction
     with pytest.raises(TypeError):
         some_schema = pa.schema([pa.field("x", pa.int32())])
         pq.ParquetWriter(None, some_schema)
 
     with pytest.raises(TypeError):
-        pq.ParquetWriter("some_path", None)
+        pq.ParquetWriter(tempdir / "some_path", None)
 
 
 @pytest.mark.pandas
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 6f3b54b0cd..c6967326b3 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -25,6 +25,7 @@ import textwrap
 import tempfile
 import threading
 import time
+from shutil import copytree
 
 from urllib.parse import quote
 
@@ -788,12 +789,15 @@ def test_parquet_scan_options():
     opts5 = ds.ParquetFragmentScanOptions(
         thrift_string_size_limit=123456,
         thrift_container_size_limit=987654,)
+    opts6 = ds.ParquetFragmentScanOptions(
+        page_checksum_verification=True)
 
     assert opts1.use_buffered_stream is False
     assert opts1.buffer_size == 2**13
     assert opts1.pre_buffer is True
     assert opts1.thrift_string_size_limit == 100_000_000  # default in C++
     assert opts1.thrift_container_size_limit == 1_000_000  # default in C++
+    assert opts1.page_checksum_verification is False
 
     assert opts2.use_buffered_stream is False
     assert opts2.buffer_size == 2**12
@@ -810,11 +814,14 @@ def test_parquet_scan_options():
     assert opts5.thrift_string_size_limit == 123456
     assert opts5.thrift_container_size_limit == 987654
 
+    assert opts6.page_checksum_verification is True
+
     assert opts1 == opts1
     assert opts1 != opts2
     assert opts2 != opts3
     assert opts3 != opts4
     assert opts5 != opts1
+    assert opts6 != opts1
 
 
 def test_file_format_pickling(pickle_module):
@@ -5376,3 +5383,76 @@ def test_dataset_sort_by(tempdir, dstype):
     sorted_tab_dict = sorted_tab.to_table().to_pydict()
     assert sorted_tab_dict["a"] == [5, 7, 7, 35]
     assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"]
+
+
+def test_checksum_write_dataset_read_dataset_to_table(tempdir):
+    """Check that checksum verification works for datasets created with
+    ds.write_dataset and read with ds.dataset.to_table"""
+
+    table_orig = pa.table({'a': [1, 2, 3, 4]})
+
+    # Write a sample dataset with page checksum enabled
+    pq_write_format = pa.dataset.ParquetFileFormat()
+    write_options = pq_write_format.make_write_options(
+        write_page_checksum=True)
+
+    original_dir_path = tempdir / 'correct_dir'
+    ds.write_dataset(
+        data=table_orig,
+        base_dir=original_dir_path,
+        format=pq_write_format,
+        file_options=write_options,
+    )
+
+    # Open dataset and verify that the data is correct
+    pq_scan_opts_crc = ds.ParquetFragmentScanOptions(
+        page_checksum_verification=True)
+    pq_read_format_crc = pa.dataset.ParquetFileFormat(
+        default_fragment_scan_options=pq_scan_opts_crc)
+    table_check = ds.dataset(
+        original_dir_path,
+        format=pq_read_format_crc
+    ).to_table()
+    assert table_orig == table_check
+
+    # Copy dataset dir (which should be just one file)
+    corrupted_dir_path = tempdir / 'corrupted_dir'
+    copytree(original_dir_path, corrupted_dir_path)
+
+    # Read the only file in the path as binary and swap the 31-th and 36-th
+    # bytes. This should be equivalent to storing the following data:
+    #    pa.table({'a': [1, 3, 2, 4]})
+    corrupted_file_path_list = list(corrupted_dir_path.iterdir())
+    assert len(corrupted_file_path_list) == 1
+    corrupted_file_path = corrupted_file_path_list[0]
+    bin_data = bytearray(corrupted_file_path.read_bytes())
+
+    # Swap two bytes to emulate corruption. Also, check that the two bytes are
+    # different, otherwise no corruption occurs
+    assert bin_data[31] != bin_data[36]
+    bin_data[31], bin_data[36] = bin_data[36], bin_data[31]
+
+    # Write the corrupted data to the parquet file
+    corrupted_file_path.write_bytes(bin_data)
+
+    # Case 1: Reading the corrupted file with dataset().to_table() and without
+    # page checksum verification succeeds but yields corrupted data
+    pq_scan_opts_no_crc = ds.ParquetFragmentScanOptions(
+        page_checksum_verification=False)
+    pq_read_format_no_crc = pa.dataset.ParquetFileFormat(
+        default_fragment_scan_options=pq_scan_opts_no_crc)
+    table_corrupt = ds.dataset(
+        corrupted_dir_path, format=pq_read_format_no_crc).to_table()
+
+    # The read should complete without error, but the table has different
+    # content than the original file!
+    assert table_corrupt != table_orig
+    assert table_corrupt == pa.table({'a': [1, 3, 2, 4]})
+
+    # Case 2: Reading the corrupted file with read_table() and with page
+    # checksum verification enabled raises an exception
+    with pytest.raises(OSError, match="CRC checksum verification"):
+        _ = ds.dataset(
+            corrupted_dir_path,
+            format=pq_read_format_crc
+        ).to_table()