You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/02 00:57:51 UTC

[GitHub] [arrow] arw2019 opened a new pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

arw2019 opened a new pull request #8816:
URL: https://github.com/apache/arrow/pull/8816


   Only relocation - none of the tests are touched.
   
   cc @jorisvandenbossche


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] arw2019 commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
arw2019 commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r536531363



##########
File path: python/pyarrow/tests/parquet/test_pandas.py
##########
@@ -0,0 +1,759 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       these aren't pandas-dependent tests - rather they're the tests that test interop with pandas data structures




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] arw2019 commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
arw2019 commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r536531053



##########
File path: python/pyarrow/tests/parquet/test_multifile_ds.py
##########
@@ -0,0 +1,1662 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import os
+from distutils.version import LooseVersion
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow import fs
+from pyarrow.filesystem import FileSystem, LocalFileSystem
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (
+    parametrize_legacy_dataset,
+    parametrize_legacy_dataset_fixed, parametrize_legacy_dataset_not_supported)
+from pyarrow.util import guid
+
+try:
+    import pyarrow.parquet as pq
+    from pyarrow.tests.parquet.common import (
+        _read_table, _test_dataframe, _test_read_common_metadata_files,
+        _test_write_to_dataset_no_partitions,
+        _test_write_to_dataset_with_partitions, _write_table)
+except ImportError:
+    pq = None
+
+
+try:
+    import pandas as pd
+    import pandas.testing as tm
+
+except ImportError:
+    pd = tm = None
+
+
+@pytest.mark.pandas
+def test_parquet_piece_read(tempdir):
+    df = _test_dataframe(1000)
+    table = pa.Table.from_pandas(df)
+
+    path = tempdir / 'parquet_piece_read.parquet'
+    _write_table(table, path, version='2.0')
+
+    piece1 = pq.ParquetDatasetPiece(path)
+
+    result = piece1.read()
+    assert result.equals(table)
+
+
+@pytest.mark.pandas
+def test_parquet_piece_open_and_get_metadata(tempdir):
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df)
+
+    path = tempdir / 'parquet_piece_read.parquet'
+    _write_table(table, path, version='2.0')
+
+    piece = pq.ParquetDatasetPiece(path)
+    table1 = piece.read()
+    assert isinstance(table1, pa.Table)
+    meta1 = piece.get_metadata()
+    assert isinstance(meta1, pq.FileMetaData)
+
+    assert table.equals(table1)
+
+
+def test_parquet_piece_basics():
+    path = '/baz.parq'
+
+    piece1 = pq.ParquetDatasetPiece(path)
+    piece2 = pq.ParquetDatasetPiece(path, row_group=1)
+    piece3 = pq.ParquetDatasetPiece(
+        path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)])
+
+    assert str(piece1) == path
+    assert str(piece2) == '/baz.parq | row_group=1'
+    assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1'
+
+    assert piece1 == piece1
+    assert piece2 == piece2
+    assert piece3 == piece3
+    assert piece1 != piece3
+
+
+def test_partition_set_dictionary_type():
+    set1 = pq.PartitionSet('key1', ['foo', 'bar', 'baz'])
+    set2 = pq.PartitionSet('key2', [2007, 2008, 2009])
+
+    assert isinstance(set1.dictionary, pa.StringArray)
+    assert isinstance(set2.dictionary, pa.IntegerArray)
+
+    set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)])
+    with pytest.raises(TypeError):
+        set3.dictionary
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_partitioned_directory(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+def test_create_parquet_dataset_multi_threaded(tempdir):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    _partition_test_for_filesystem(fs, base_path)
+
+    manifest = pq.ParquetManifest(base_path, filesystem=fs,
+                                  metadata_nthreads=1)
+    dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16)
+    assert len(dataset.pieces) > 0
+    partitions = dataset.partitions
+    assert len(partitions.partition_names) > 0
+    assert partitions.partition_names == manifest.partitions.partition_names
+    assert len(partitions.levels) == len(manifest.partitions.levels)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_partitioned_columns_selection(tempdir, use_legacy_dataset):
+    # ARROW-3861 - do not include partition columns in resulting table when
+    # `columns` keyword was passed without those columns
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+    _partition_test_for_filesystem(fs, base_path)
+
+    dataset = pq.ParquetDataset(
+        base_path, use_legacy_dataset=use_legacy_dataset)
+    result = dataset.read(columns=["values"])
+    if use_legacy_dataset:
+        # ParquetDataset implementation always includes the partition columns
+        # automatically, and we can't easily "fix" this since dask relies on
+        # this behaviour (ARROW-8644)
+        assert result.column_names == ["values", "foo", "bar"]
+    else:
+        assert result.column_names == ["values"]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_equivalency(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1]
+    string_keys = ['a', 'b', 'c']
+    boolean_keys = [True, False]
+    partition_spec = [
+        ['integer', integer_keys],
+        ['string', string_keys],
+        ['boolean', boolean_keys]
+    ]
+
+    df = pd.DataFrame({
+        'integer': np.array(integer_keys, dtype='i4').repeat(15),
+        'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
+        'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
+                           3),
+    }, columns=['integer', 'string', 'boolean'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    # Old filters syntax:
+    #  integer == 1 AND string != b AND boolean == True
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[('integer', '=', 1), ('string', '!=', 'b'),
+                 ('boolean', '==', True)],
+        use_legacy_dataset=use_legacy_dataset,
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas().reset_index(drop=True))
+
+    assert 0 not in result_df['integer'].values
+    assert 'b' not in result_df['string'].values
+    assert False not in result_df['boolean'].values
+
+    # filters in disjunctive normal form:
+    #  (integer == 1 AND string != b AND boolean == True) OR
+    #  (integer == 2 AND boolean == False)
+    # TODO(ARROW-3388): boolean columns are reconstructed as string
+    filters = [
+        [
+            ('integer', '=', 1),
+            ('string', '!=', 'b'),
+            ('boolean', '==', 'True')
+        ],
+        [('integer', '=', 0), ('boolean', '==', 'False')]
+    ]
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs, filters=filters,
+        use_legacy_dataset=use_legacy_dataset)
+    table = dataset.read()
+    result_df = table.to_pandas().reset_index(drop=True)
+
+    # Check that all rows in the DF fulfill the filter
+    # Pandas 0.23.x has problems with indexing constant memoryviews in
+    # categoricals. Thus we need to make an explicit copy here with np.array.
+    df_filter_1 = (np.array(result_df['integer']) == 1) \
+        & (np.array(result_df['string']) != 'b') \
+        & (np.array(result_df['boolean']) == 'True')
+    df_filter_2 = (np.array(result_df['integer']) == 0) \
+        & (np.array(result_df['boolean']) == 'False')
+    assert df_filter_1.sum() > 0
+    assert df_filter_2.sum() > 0
+    assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())
+
+    if use_legacy_dataset:
+        # Check for \0 in predicate values. Until they are correctly
+        # implemented in ARROW-3391, they would otherwise lead to weird
+        # results with the current code.
+        with pytest.raises(NotImplementedError):
+            filters = [[('string', '==', b'1\0a')]]
+            pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+        with pytest.raises(NotImplementedError):
+            filters = [[('string', '==', '1\0a')]]
+            pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+    else:
+        for filters in [[[('string', '==', b'1\0a')]],
+                        [[('string', '==', '1\0a')]]]:
+            dataset = pq.ParquetDataset(
+                base_path, filesystem=fs, filters=filters,
+                use_legacy_dataset=False)
+            assert dataset.read().num_rows == 0
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[
+            ('integers', '<', 4),
+            ('integers', '>', 1),
+        ],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                      .sort_values(by='index')
+                      .reset_index(drop=True))
+
+    result_list = [x for x in map(int, result_df['integers'].values)]
+    assert result_list == [2, 3]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.xfail(
+    # different error with use_legacy_datasets because result_df is no longer
+    # categorical
+    raises=(TypeError, AssertionError),
+    reason='Loss of type information in creation of categoricals.'
+)
+def test_filters_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    date_keys = [
+        datetime.date(2018, 4, 9),
+        datetime.date(2018, 4, 10),
+        datetime.date(2018, 4, 11),
+        datetime.date(2018, 4, 12),
+        datetime.date(2018, 4, 13)
+    ]
+    partition_spec = [
+        ['dates', date_keys]
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'dates': np.array(date_keys, dtype='datetime64'),
+    }, columns=['index', 'dates'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[
+            ('dates', '<', "2018-04-12"),
+            ('dates', '>', "2018-04-10")
+        ],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                      .sort_values(by='index')
+                      .reset_index(drop=True))
+
+    expected = pd.Categorical(
+        np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
+        categories=np.array(date_keys, dtype='datetime64'))
+
+    assert result_df['dates'].values == expected
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_inclusive_integer(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[
+            ('integers', '<=', 3),
+            ('integers', '>=', 2),
+        ],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                 .sort_values(by='index')
+                 .reset_index(drop=True))
+
+    result_list = [int(x) for x in map(int, result_df['integers'].values)]
+    assert result_list == [2, 3]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_inclusive_set(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1]
+    string_keys = ['a', 'b', 'c']
+    boolean_keys = [True, False]
+    partition_spec = [
+        ['integer', integer_keys],
+        ['string', string_keys],
+        ['boolean', boolean_keys]
+    ]
+
+    df = pd.DataFrame({
+        'integer': np.array(integer_keys, dtype='i4').repeat(15),
+        'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
+        'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
+                           3),
+    }, columns=['integer', 'string', 'boolean'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[('integer', 'in', {1}), ('string', 'in', {'a', 'b'}),
+                 ('boolean', 'in', {True})],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas().reset_index(drop=True))
+
+    assert 0 not in result_df['integer'].values
+    assert 'c' not in result_df['string'].values
+    assert False not in result_df['boolean'].values
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_invalid_pred_op(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    with pytest.raises(ValueError):
+        pq.ParquetDataset(base_path,
+                          filesystem=fs,
+                          filters=[('integers', '=<', 3), ],
+                          use_legacy_dataset=use_legacy_dataset)
+
+    if use_legacy_dataset:
+        with pytest.raises(ValueError):
+            pq.ParquetDataset(base_path,
+                              filesystem=fs,
+                              filters=[('integers', 'in', set()), ],
+                              use_legacy_dataset=use_legacy_dataset)
+    else:
+        # Dataset API returns empty table instead
+        dataset = pq.ParquetDataset(base_path,
+                                    filesystem=fs,
+                                    filters=[('integers', 'in', set()), ],
+                                    use_legacy_dataset=use_legacy_dataset)
+        assert dataset.read().num_rows == 0
+
+    with pytest.raises(ValueError):
+        pq.ParquetDataset(base_path,
+                          filesystem=fs,
+                          filters=[('integers', '!=', {3})],
+                          use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_filters_invalid_column(tempdir, use_legacy_dataset):
+    # ARROW-5572 - raise error on invalid name in filter specification
+    # works with new dataset / xfail with legacy implementation
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [['integers', integer_keys]]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    msg = "Field named 'non_existent_column' not found"
+    with pytest.raises(ValueError, match=msg):
+        pq.ParquetDataset(base_path, filesystem=fs,
+                          filters=[('non_existent_column', '<', 3), ],
+                          use_legacy_dataset=use_legacy_dataset).read()
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_read_table(tempdir, use_legacy_dataset):
+    # test that filters keyword is passed through in read_table
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    table = pq.read_table(
+        base_path, filesystem=fs, filters=[('integers', '<', 3)],
+        use_legacy_dataset=use_legacy_dataset)
+    assert table.num_rows == 3
+
+    table = pq.read_table(
+        base_path, filesystem=fs, filters=[[('integers', '<', 3)]],
+        use_legacy_dataset=use_legacy_dataset)
+    assert table.num_rows == 3
+
+    table = pq.read_pandas(
+        base_path, filters=[('integers', '<', 3)],
+        use_legacy_dataset=use_legacy_dataset)
+    assert table.num_rows == 3
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_partition_keys_with_underscores(tempdir, use_legacy_dataset):
+    # ARROW-5666 - partition field values with underscores preserve underscores
+    # xfail with legacy dataset -> they get interpreted as integers
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    string_keys = ["2019_2", "2019_3"]
+    partition_spec = [
+        ['year_week', string_keys],
+    ]
+    N = 2
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'year_week': np.array(string_keys, dtype='object'),
+    }, columns=['index', 'year_week'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, use_legacy_dataset=use_legacy_dataset)
+    result = dataset.read()
+    assert result.column("year_week").to_pylist() == string_keys
+
+
+@pytest.fixture
+def s3_bucket(request, s3_connection, s3_server):
+    boto3 = pytest.importorskip('boto3')
+    botocore = pytest.importorskip('botocore')
+
+    host, port, access_key, secret_key = s3_connection
+    s3 = boto3.resource(
+        's3',
+        endpoint_url='http://{}:{}'.format(host, port),
+        aws_access_key_id=access_key,
+        aws_secret_access_key=secret_key,
+        config=botocore.client.Config(signature_version='s3v4'),
+        region_name='us-east-1'
+    )
+    bucket = s3.Bucket('test-s3fs')
+    try:
+        bucket.create()
+    except Exception:
+        # we get BucketAlreadyOwnedByYou error with fsspec handler
+        pass
+    return 'test-s3fs'
+
+
+@pytest.fixture
+def s3_example_s3fs(s3_connection, s3_server, s3_bucket):
+    s3fs = pytest.importorskip('s3fs')
+
+    host, port, access_key, secret_key = s3_connection
+    fs = s3fs.S3FileSystem(
+        key=access_key,
+        secret=secret_key,
+        client_kwargs={
+            'endpoint_url': 'http://{}:{}'.format(host, port)
+        }
+    )
+
+    test_path = '{}/{}'.format(s3_bucket, guid())
+
+    fs.mkdir(test_path)
+    yield fs, test_path
+    try:
+        fs.rm(test_path, recursive=True)
+    except FileNotFoundError:
+        pass
+
+
+@parametrize_legacy_dataset
+def test_read_s3fs(s3_example_s3fs, use_legacy_dataset):
+    fs, path = s3_example_s3fs
+    path = path + "/test.parquet"
+    table = pa.table({"a": [1, 2, 3]})
+    _write_table(table, path, filesystem=fs)
+
+    result = _read_table(
+        path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+    )
+    assert result.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_read_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
+    fs, directory = s3_example_s3fs
+    path = directory + "/test.parquet"
+    table = pa.table({"a": [1, 2, 3]})
+    _write_table(table, path, filesystem=fs)
+
+    result = _read_table(
+        directory, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+    )
+    assert result.equals(table)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_read_partitioned_directory_s3fs_wrapper(
+    s3_example_s3fs, use_legacy_dataset
+):
+    import s3fs
+
+    from pyarrow.filesystem import S3FSWrapper
+
+    if s3fs.__version__ >= LooseVersion("0.5"):
+        pytest.skip("S3FSWrapper no longer working for s3fs 0.5+")
+
+    fs, path = s3_example_s3fs
+    with pytest.warns(DeprecationWarning):
+        wrapper = S3FSWrapper(fs)
+    _partition_test_for_filesystem(wrapper, path)
+
+    # Check that we can auto-wrap
+    dataset = pq.ParquetDataset(
+        path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+    )
+    dataset.read()
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
+    fs, path = s3_example_s3fs
+    _partition_test_for_filesystem(
+        fs, path, use_legacy_dataset=use_legacy_dataset
+    )
+
+
+def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True):
+    foo_keys = [0, 1]
+    bar_keys = ['a', 'b', 'c']
+    partition_spec = [
+        ['foo', foo_keys],
+        ['bar', bar_keys]
+    ]
+    N = 30
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'foo': np.array(foo_keys, dtype='i4').repeat(15),
+        'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
+        'values': np.random.randn(N)
+    }, columns=['index', 'foo', 'bar', 'values'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs, use_legacy_dataset=use_legacy_dataset)
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                 .sort_values(by='index')
+                 .reset_index(drop=True))
+
+    expected_df = (df.sort_values(by='index')
+                   .reset_index(drop=True)
+                   .reindex(columns=result_df.columns))
+
+    expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
+    expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
+
+    assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
+
+    tm.assert_frame_equal(result_df, expected_df)
+
+
+def _generate_partition_directories(fs, base_dir, partition_spec, df):
+    # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
+    #                                       ['bar', ['a', 'b', 'c']]
+    # part_table : a pyarrow.Table to write to each partition
+    DEPTH = len(partition_spec)
+
+    pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/"))
+
+    def _visit_level(base_dir, level, part_keys):
+        name, values = partition_spec[level]
+        for value in values:
+            this_part_keys = part_keys + [(name, value)]
+
+            level_dir = pathsep.join([
+                str(base_dir),
+                '{}={}'.format(name, value)
+            ])
+            fs.mkdir(level_dir)
+
+            if level == DEPTH - 1:
+                # Generate example data
+                file_path = pathsep.join([level_dir, guid()])
+                filtered_df = _filter_partition(df, this_part_keys)
+                part_table = pa.Table.from_pandas(filtered_df)
+                with fs.open(file_path, 'wb') as f:
+                    _write_table(part_table, f)
+                assert fs.exists(file_path)
+
+                file_success = pathsep.join([level_dir, '_SUCCESS'])
+                with fs.open(file_success, 'wb') as f:
+                    pass
+            else:
+                _visit_level(level_dir, level + 1, this_part_keys)
+                file_success = pathsep.join([level_dir, '_SUCCESS'])
+                with fs.open(file_success, 'wb') as f:
+                    pass
+
+    _visit_level(base_dir, 0, [])
+
+
+@pytest.mark.pandas
+def test_read_common_metadata_files(tempdir):
+    fs = LocalFileSystem._get_instance()
+    _test_read_common_metadata_files(fs, tempdir)
+
+
+@pytest.mark.pandas
+def test_read_metadata_files(tempdir):
+    fs = LocalFileSystem._get_instance()
+
+    N = 100
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+
+    data_path = tempdir / 'data.parquet'
+
+    table = pa.Table.from_pandas(df)
+
+    with fs.open(data_path, 'wb') as f:
+        _write_table(table, f)
+
+    metadata_path = tempdir / '_metadata'
+    with fs.open(metadata_path, 'wb') as f:
+        pq.write_metadata(table.schema, f)
+
+    dataset = pq.ParquetDataset(tempdir, filesystem=fs)
+    assert dataset.metadata_path == str(metadata_path)
+
+    with fs.open(data_path) as f:
+        metadata_schema = pq.read_metadata(f).schema
+    assert dataset.schema.equals(metadata_schema)
+
+
+@pytest.mark.pandas
+def test_read_schema(tempdir):
+    N = 100
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+
+    data_path = tempdir / 'test.parquet'
+
+    table = pa.Table.from_pandas(df)
+    _write_table(table, data_path)
+
+    read1 = pq.read_schema(data_path)
+    read2 = pq.read_schema(data_path, memory_map=True)
+    assert table.schema.equals(read1)
+    assert table.schema.equals(read2)
+
+    assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas']
+
+
+def _filter_partition(df, part_keys):
+    predicate = np.ones(len(df), dtype=bool)
+
+    to_drop = []
+    for name, value in part_keys:
+        to_drop.append(name)
+
+        # to avoid pandas warning
+        if isinstance(value, (datetime.date, datetime.datetime)):
+            value = pd.Timestamp(value)
+
+        predicate &= df[name] == value
+
+    return df[predicate].drop(to_drop, axis=1)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_multiple_files(tempdir, use_legacy_dataset):
+    nfiles = 10
+    size = 5
+
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    test_data = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(size, seed=i)
+
+        # Hack so that we don't have a dtype cast in v1 files
+        df['uint32'] = df['uint32'].astype(np.int64)
+
+        path = dirpath / '{}.parquet'.format(i)
+
+        table = pa.Table.from_pandas(df)
+        _write_table(table, path)
+
+        test_data.append(table)
+        paths.append(path)
+
+    # Write a _SUCCESS.crc file
+    (dirpath / '_SUCCESS.crc').touch()
+
+    def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
+        dataset = pq.ParquetDataset(
+            paths, use_legacy_dataset=use_legacy_dataset, **kwargs)
+        return dataset.read(columns=columns, use_threads=use_threads)
+
+    result = read_multiple_files(paths)
+    expected = pa.concat_tables(test_data)
+
+    assert result.equals(expected)
+
+    # Read with provided metadata
+    # TODO(dataset) specifying metadata not yet supported
+    metadata = pq.read_metadata(paths[0])
+    if use_legacy_dataset:
+        result2 = read_multiple_files(paths, metadata=metadata)
+        assert result2.equals(expected)
+
+        result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read()
+        assert result3.equals(expected)
+    else:
+        with pytest.raises(ValueError, match="no longer supported"):
+            pq.read_table(paths, metadata=metadata, use_legacy_dataset=False)
+
+    # Read column subset
+    to_read = [0, 2, 6, result.num_columns - 1]
+
+    col_names = [result.field(i).name for i in to_read]
+    out = pq.read_table(
+        dirpath, columns=col_names, use_legacy_dataset=use_legacy_dataset
+    )
+    expected = pa.Table.from_arrays([result.column(i) for i in to_read],
+                                    names=col_names,
+                                    metadata=result.schema.metadata)
+    assert out.equals(expected)
+
+    # Read with multiple threads
+    pq.read_table(
+        dirpath, use_threads=True, use_legacy_dataset=use_legacy_dataset
+    )
+
+    # Test failure modes with non-uniform metadata
+    bad_apple = _test_dataframe(size, seed=i).iloc[:, :4]
+    bad_apple_path = tempdir / '{}.parquet'.format(guid())
+
+    t = pa.Table.from_pandas(bad_apple)
+    _write_table(t, bad_apple_path)
+
+    if not use_legacy_dataset:
+        # TODO(dataset) Dataset API skips bad files
+        return
+
+    bad_meta = pq.read_metadata(bad_apple_path)
+
+    with pytest.raises(ValueError):
+        read_multiple_files(paths + [bad_apple_path])
+
+    with pytest.raises(ValueError):
+        read_multiple_files(paths, metadata=bad_meta)
+
+    mixed_paths = [bad_apple_path, paths[0]]
+
+    with pytest.raises(ValueError):
+        read_multiple_files(mixed_paths, schema=bad_meta.schema)
+
+    with pytest.raises(ValueError):
+        read_multiple_files(mixed_paths)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_read_pandas(tempdir, use_legacy_dataset):
+    nfiles = 5
+    size = 5
+
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    test_data = []
+    frames = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(size, seed=i)
+        df.index = np.arange(i * size, (i + 1) * size)
+        df.index.name = 'index'
+
+        path = dirpath / '{}.parquet'.format(i)
+
+        table = pa.Table.from_pandas(df)
+        _write_table(table, path)
+        test_data.append(table)
+        frames.append(df)
+        paths.append(path)
+
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+    columns = ['uint8', 'strings']
+    result = dataset.read_pandas(columns=columns).to_pandas()
+    expected = pd.concat([x[columns] for x in frames])
+
+    tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_memory_map(tempdir, use_legacy_dataset):
+    # ARROW-2627: Check that we can use ParquetDataset with memory-mapping
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    df = _test_dataframe(10, seed=0)
+    path = dirpath / '{}.parquet'.format(0)
+    table = pa.Table.from_pandas(df)
+    _write_table(table, path, version='2.0')
+
+    dataset = pq.ParquetDataset(
+        dirpath, memory_map=True, use_legacy_dataset=use_legacy_dataset)
+    assert dataset.read().equals(table)
+    if use_legacy_dataset:
+        assert dataset.pieces[0].read().equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    df = _test_dataframe(10, seed=0)
+    path = dirpath / '{}.parquet'.format(0)
+    table = pa.Table.from_pandas(df)
+    _write_table(table, path, version='2.0')
+
+    with pytest.raises(ValueError):
+        pq.ParquetDataset(
+            dirpath, buffer_size=-64,
+            use_legacy_dataset=use_legacy_dataset)
+
+    for buffer_size in [128, 1024]:
+        dataset = pq.ParquetDataset(
+            dirpath, buffer_size=buffer_size,
+            use_legacy_dataset=use_legacy_dataset)
+        assert dataset.read().equals(table)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('preserve_index', [True, False, None])
+def test_dataset_read_pandas_common_metadata(tempdir, preserve_index):
+    # ARROW-1103
+    nfiles = 5
+    size = 5
+
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    test_data = []
+    frames = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(size, seed=i)
+        df.index = pd.Index(np.arange(i * size, (i + 1) * size), name='index')
+
+        path = dirpath / '{}.parquet'.format(i)
+
+        table = pa.Table.from_pandas(df, preserve_index=preserve_index)
+
+        # Obliterate metadata
+        table = table.replace_schema_metadata(None)
+        assert table.schema.metadata is None
+
+        _write_table(table, path)
+        test_data.append(table)
+        frames.append(df)
+        paths.append(path)
+
+    # Write _metadata common file
+    table_for_metadata = pa.Table.from_pandas(
+        df, preserve_index=preserve_index
+    )
+    pq.write_metadata(table_for_metadata.schema, dirpath / '_metadata')
+
+    dataset = pq.ParquetDataset(dirpath)
+    columns = ['uint8', 'strings']
+    result = dataset.read_pandas(columns=columns).to_pandas()
+    expected = pd.concat([x[columns] for x in frames])
+    expected.index.name = (
+        df.index.name if preserve_index is not False else None)
+    tm.assert_frame_equal(result, expected)
+
+
+def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
+    test_data = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(file_nrows, seed=i)
+        path = base_path / '{}.parquet'.format(i)
+
+        test_data.append(_write_table(df, path))
+        paths.append(path)
+    return paths
+
+
+def _assert_dataset_paths(dataset, paths, use_legacy_dataset):
+    if use_legacy_dataset:
+        assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+    else:
+        paths = [str(path.as_posix()) for path in paths]
+        assert set(paths) == set(dataset._dataset.files)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dir_prefix', ['_', '.'])
+def test_ignore_private_directories(tempdir, dir_prefix, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    # private directory
+    (dirpath / '{}staging'.format(dir_prefix)).mkdir()
+
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_dot(tempdir, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    with (dirpath / '.DS_Store').open('wb') as f:
+        f.write(b'gibberish')
+
+    with (dirpath / '.private').open('wb') as f:
+        f.write(b'gibberish')
+
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    with (dirpath / '_committed_123').open('wb') as f:
+        f.write(b'abcd')
+
+    with (dirpath / '_started_321').open('wb') as f:
+        f.write(b'abcd')
+
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dir_prefix', ['_', '.'])
+def test_ignore_no_private_directories_in_base_path(
+    tempdir, dir_prefix, use_legacy_dataset
+):
+    # ARROW-8427 - don't ignore explicitly listed files if parent directory
+    # is a private directory
+    dirpath = tempdir / "{0}data".format(dir_prefix) / guid()
+    dirpath.mkdir(parents=True)
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    dataset = pq.ParquetDataset(paths, use_legacy_dataset=use_legacy_dataset)
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+    # ARROW-9644 - don't ignore full directory with underscore in base path
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_ignore_custom_prefixes(tempdir, use_legacy_dataset):
+    # ARROW-9573 - allow override of default ignore_prefixes
+    part = ["xxx"] * 3 + ["yyy"] * 3
+    table = pa.table([
+        pa.array(range(len(part))),
+        pa.array(part).dictionary_encode(),
+    ], names=['index', '_part'])
+
+    # TODO use_legacy_dataset ARROW-10247
+    pq.write_to_dataset(table, str(tempdir), partition_cols=['_part'])
+
+    private_duplicate = tempdir / '_private_duplicate'
+    private_duplicate.mkdir()
+    pq.write_to_dataset(table, str(private_duplicate),
+                        partition_cols=['_part'])
+
+    read = pq.read_table(
+        tempdir, use_legacy_dataset=use_legacy_dataset,
+        ignore_prefixes=['_private'])
+
+    assert read.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions(tempdir, use_legacy_dataset):
+    _test_write_to_dataset_with_partitions(str(tempdir), use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_schema(
+    tempdir, use_legacy_dataset
+):
+    schema = pa.schema([pa.field('group1', type=pa.string()),
+                        pa.field('group2', type=pa.string()),
+                        pa.field('num', type=pa.int64()),
+                        pa.field('nan', type=pa.int32()),
+                        pa.field('date', type=pa.timestamp(unit='us'))])
+    _test_write_to_dataset_with_partitions(
+        str(tempdir), use_legacy_dataset, schema=schema)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_index_name(
+    tempdir, use_legacy_dataset
+):
+    _test_write_to_dataset_with_partitions(
+        str(tempdir), use_legacy_dataset, index_name='index_name')
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_no_partitions(tempdir, use_legacy_dataset):
+    _test_write_to_dataset_no_partitions(str(tempdir), use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pathlib(tempdir, use_legacy_dataset):
+    _test_write_to_dataset_with_partitions(
+        tempdir / "test1", use_legacy_dataset)
+    _test_write_to_dataset_no_partitions(
+        tempdir / "test2", use_legacy_dataset)
+
+
+# Those tests are failing - see ARROW-10370
+# @pytest.mark.pandas
+# @pytest.mark.s3
+# @parametrize_legacy_dataset
+# def test_write_to_dataset_pathlib_nonlocal(
+#     tempdir, s3_example_s3fs, use_legacy_dataset
+# ):
+#    # pathlib paths are only accepted for local files
+#    fs, _ = s3_example_s3fs
+
+#    with pytest.raises(TypeError, match="path-like objects are only allowed"):
+#         _test_write_to_dataset_with_partitions(
+#             tempdir / "test1", use_legacy_dataset, filesystem=fs)
+
+#    with pytest.raises(TypeError, match="path-like objects are only allowed"):
+#         _test_write_to_dataset_no_partitions(
+#             tempdir / "test2", use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_s3fs(
+    s3_example_s3fs, use_legacy_dataset
+):
+    fs, path = s3_example_s3fs
+
+    _test_write_to_dataset_with_partitions(
+        path, use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_no_partitions_s3fs(
+    s3_example_s3fs, use_legacy_dataset
+):
+    fs, path = s3_example_s3fs
+
+    _test_write_to_dataset_no_partitions(
+        path, use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_not_supported
+def test_write_to_dataset_with_partitions_and_custom_filenames(
+    tempdir, use_legacy_dataset
+):
+    output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
+                              'group2': list('eefeffgeee'),
+                              'num': list(range(10)),
+                              'nan': [np.nan] * 10,
+                              'date': np.arange('2017-01-01', '2017-01-11',
+                                                dtype='datetime64[D]')})
+    partition_by = ['group1', 'group2']
+    output_table = pa.Table.from_pandas(output_df)
+    path = str(tempdir)
+
+    def partition_filename_callback(keys):
+        return "{}-{}.parquet".format(*keys)
+
+    pq.write_to_dataset(output_table, path,
+                        partition_by, partition_filename_callback,
+                        use_legacy_dataset=use_legacy_dataset)
+
+    dataset = pq.ParquetDataset(path)
+
+    # ARROW-3538: Ensure partition filenames match the given pattern
+    # defined in the local function partition_filename_callback
+    expected_basenames = [
+        'a-e.parquet', 'a-f.parquet',
+        'b-e.parquet', 'b-f.parquet',
+        'b-g.parquet', 'c-e.parquet'
+    ]
+    output_basenames = [os.path.basename(p.path) for p in dataset.pieces]
+
+    assert sorted(expected_basenames) == sorted(output_basenames)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pandas_preserve_extensiondtypes(
+    tempdir, use_legacy_dataset
+):
+    # ARROW-8251 - preserve pandas extension dtypes in roundtrip
+    if LooseVersion(pd.__version__) < "1.0.0":
+        pytest.skip("__arrow_array__ added to pandas in 1.0.0")
+
+    df = pd.DataFrame({'part': 'a', "col": [1, 2, 3]})
+    df['col'] = df['col'].astype("Int64")
+    table = pa.table(df)
+
+    pq.write_to_dataset(
+        table, str(tempdir / "case1"), partition_cols=['part'],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    result = pq.read_table(
+        str(tempdir / "case1"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+    pq.write_to_dataset(
+        table, str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+    )
+    result = pq.read_table(
+        str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+    pq.write_table(table, str(tempdir / "data.parquet"))
+    result = pq.read_table(
+        str(tempdir / "data.parquet"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pandas_preserve_index(tempdir, use_legacy_dataset):
+    # ARROW-8251 - preserve pandas index in roundtrip
+
+    df = pd.DataFrame({'part': ['a', 'a', 'b'], "col": [1, 2, 3]})
+    df.index = pd.Index(['a', 'b', 'c'], name="idx")
+    table = pa.table(df)
+    df_cat = df[["col", "part"]].copy()
+    df_cat["part"] = df_cat["part"].astype("category")
+
+    pq.write_to_dataset(
+        table, str(tempdir / "case1"), partition_cols=['part'],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    result = pq.read_table(
+        str(tempdir / "case1"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result, df_cat)
+
+    pq.write_to_dataset(
+        table, str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+    )
+    result = pq.read_table(
+        str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+    pq.write_table(table, str(tempdir / "data.parquet"))
+    result = pq.read_table(
+        str(tempdir / "data.parquet"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+
+# TODO(dataset) support pickling
+def _make_dataset_for_pickling(tempdir, N=100):
+    path = tempdir / 'data.parquet'
+    fs = LocalFileSystem._get_instance()
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+    table = pa.Table.from_pandas(df)
+
+    num_groups = 3
+    with pq.ParquetWriter(path, table.schema) as writer:
+        for i in range(num_groups):
+            writer.write_table(table)
+
+    reader = pq.ParquetFile(path)
+    assert reader.metadata.num_row_groups == num_groups
+
+    metadata_path = tempdir / '_metadata'
+    with fs.open(metadata_path, 'wb') as f:
+        pq.write_metadata(table.schema, f)
+
+    dataset = pq.ParquetDataset(tempdir, filesystem=fs)
+    assert dataset.metadata_path == str(metadata_path)
+
+    return dataset
+
+
+def _assert_dataset_is_picklable(dataset, pickler):
+    def is_pickleable(obj):
+        return obj == pickler.loads(pickler.dumps(obj))
+
+    assert is_pickleable(dataset)
+    assert is_pickleable(dataset.metadata)
+    assert is_pickleable(dataset.metadata.schema)
+    assert len(dataset.metadata.schema)
+    for column in dataset.metadata.schema:
+        assert is_pickleable(column)
+
+    for piece in dataset.pieces:
+        assert is_pickleable(piece)
+        metadata = piece.get_metadata()
+        assert metadata.num_row_groups
+        for i in range(metadata.num_row_groups):
+            assert is_pickleable(metadata.row_group(i))
+
+
+@pytest.mark.pandas
+def test_builtin_pickle_dataset(tempdir, datadir):
+    import pickle
+    dataset = _make_dataset_for_pickling(tempdir)
+    _assert_dataset_is_picklable(dataset, pickler=pickle)
+
+
+@pytest.mark.pandas
+def test_cloudpickle_dataset(tempdir, datadir):
+    cp = pytest.importorskip('cloudpickle')
+    dataset = _make_dataset_for_pickling(tempdir)
+    _assert_dataset_is_picklable(dataset, pickler=cp)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize("filesystem", [
+    None,
+    LocalFileSystem._get_instance(),
+    fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):

Review comment:
       I separated them out (into `test_writer.py`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r546644093



##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet
+
+
+def _write_table(table, path, **kwargs):
+    # So we see the ImportError somewhere
+    import pyarrow.parquet as pq
+    from pyarrow.pandas_compat import _pandas_api
+
+    if _pandas_api.is_data_frame(table):
+        table = pa.Table.from_pandas(table)
+
+    pq.write_table(table, path, **kwargs)
+    return table
+
+
+def _read_table(*args, **kwargs):
+    import pyarrow.parquet as pq
+
+    table = pq.read_table(*args, **kwargs)
+    table.validate(full=True)
+    return table
+
+
+def _roundtrip_table(table, read_table_kwargs=None,
+                     write_table_kwargs=None, use_legacy_dataset=True):
+    read_table_kwargs = read_table_kwargs or {}
+    write_table_kwargs = write_table_kwargs or {}
+
+    writer = pa.BufferOutputStream()
+    _write_table(table, writer, **write_table_kwargs)
+    reader = pa.BufferReader(writer.getvalue())
+    return _read_table(reader, use_legacy_dataset=use_legacy_dataset,
+                       **read_table_kwargs)
+
+
+def _check_roundtrip(table, expected=None, read_table_kwargs=None,
+                     use_legacy_dataset=True, **write_table_kwargs):
+    if expected is None:
+        expected = table
+
+    read_table_kwargs = read_table_kwargs or {}
+
+    # intentionally check twice
+    result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+    result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+
+
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
+    table = pa.Table.from_pandas(df)
+    result = _roundtrip_table(
+        table, write_table_kwargs=write_kwargs,
+        use_legacy_dataset=use_legacy_dataset)
+    return result.to_pandas()
+
+
+def _test_read_common_metadata_files(fs, base_path):
+    import pandas as pd
+
+    import pyarrow.parquet as pq
+
+    N = 100
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+
+    base_path = str(base_path)
+    data_path = os.path.join(base_path, 'data.parquet')
+
+    table = pa.Table.from_pandas(df)
+
+    with fs.open(data_path, 'wb') as f:
+        _write_table(table, f)
+
+    metadata_path = os.path.join(base_path, '_common_metadata')
+    with fs.open(metadata_path, 'wb') as f:
+        pq.write_metadata(table.schema, f)
+
+    dataset = pq.ParquetDataset(base_path, filesystem=fs)
+    assert dataset.common_metadata_path == str(metadata_path)
+
+    with fs.open(data_path) as f:
+        common_schema = pq.read_metadata(f).schema
+    assert dataset.schema.equals(common_schema)
+
+    # handle list of one directory
+    dataset2 = pq.ParquetDataset([base_path], filesystem=fs)
+    assert dataset2.schema.equals(dataset.schema)
+
+
+def _random_integers(size, dtype):
+    # We do not generate integers outside the int64 range
+    platform_int_info = np.iinfo('int_')
+    iinfo = np.iinfo(dtype)
+    return np.random.randint(max(iinfo.min, platform_int_info.min),
+                             min(iinfo.max, platform_int_info.max),
+                             size=size).astype(dtype)
+
+
+def _test_dataframe(size=10000, seed=0):
+    import pandas as pd
+
+    np.random.seed(seed)
+    df = pd.DataFrame({
+        'uint8': _random_integers(size, np.uint8),
+        'uint16': _random_integers(size, np.uint16),
+        'uint32': _random_integers(size, np.uint32),
+        'uint64': _random_integers(size, np.uint64),
+        'int8': _random_integers(size, np.int8),
+        'int16': _random_integers(size, np.int16),
+        'int32': _random_integers(size, np.int32),
+        'int64': _random_integers(size, np.int64),
+        'float32': np.random.randn(size).astype(np.float32),
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0,
+        'strings': [util.rands(10) for i in range(size)],
+        'all_none': [None] * size,
+        'all_none_category': [None] * size
+    })
+
+    # TODO(PARQUET-1015)
+    # df['all_none_category'] = df['all_none_category'].astype('category')
+    return df
+
+
+def _test_write_to_dataset_with_partitions(base_path,

Review comment:
       I moved this one as well (to keep it consistent with `_test_read_common_metadata_files`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r540167236



##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pyarrow as pa
+import pytest
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet

Review comment:
       This works to mark the other files in the module?

##########
File path: python/pyarrow/tests/parquet/test_metadata.py
##########
@@ -0,0 +1,475 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import io
+from collections import OrderedDict
+
+import numpy as np
+import pyarrow as pa
+import pytest
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file
+
+try:
+    import pyarrow.parquet as pq
+    from pyarrow.tests.parquet.common import _write_table
+except ImportError:
+    pq = None
+
+
+try:
+    import pandas as pd
+    import pandas.testing as tm
+    from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+    pd = tm = None
+
+
+@pytest.mark.pandas
+def test_parquet_metadata_api():
+    df = alltypes_sample(size=10000)
+    df = df.reindex(columns=sorted(df.columns))
+    df.index = np.random.randint(0, 1000000, size=len(df))
+
+    fileh = make_sample_file(df)
+    ncols = len(df.columns)
+
+    # Series of sniff tests
+    meta = fileh.metadata
+    repr(meta)
+    assert meta.num_rows == len(df)
+    assert meta.num_columns == ncols + 1  # +1 for index
+    assert meta.num_row_groups == 1
+    assert meta.format_version == '2.0'
+    assert 'parquet-cpp' in meta.created_by
+    assert isinstance(meta.serialized_size, int)
+    assert isinstance(meta.metadata, dict)
+
+    # Schema
+    schema = fileh.schema
+    assert meta.schema is schema
+    assert len(schema) == ncols + 1  # +1 for index
+    repr(schema)
+
+    col = schema[0]
+    repr(col)
+    assert col.name == df.columns[0]
+    assert col.max_definition_level == 1
+    assert col.max_repetition_level == 0
+    assert col.max_repetition_level == 0
+
+    assert col.physical_type == 'BOOLEAN'
+    assert col.converted_type == 'NONE'
+
+    with pytest.raises(IndexError):
+        schema[ncols + 1]  # +1 for index
+
+    with pytest.raises(IndexError):
+        schema[-1]
+
+    # Row group
+    for rg in range(meta.num_row_groups):
+        rg_meta = meta.row_group(rg)
+        assert isinstance(rg_meta, pq.RowGroupMetaData)
+        repr(rg_meta)
+
+        for col in range(rg_meta.num_columns):
+            col_meta = rg_meta.column(col)
+            assert isinstance(col_meta, pq.ColumnChunkMetaData)
+            repr(col_meta)
+
+    with pytest.raises(IndexError):
+        meta.row_group(-1)
+
+    with pytest.raises(IndexError):
+        meta.row_group(meta.num_row_groups + 1)
+
+    rg_meta = meta.row_group(0)
+    assert rg_meta.num_rows == len(df)
+    assert rg_meta.num_columns == ncols + 1  # +1 for index
+    assert rg_meta.total_byte_size > 0
+
+    with pytest.raises(IndexError):
+        col_meta = rg_meta.column(-1)
+
+    with pytest.raises(IndexError):
+        col_meta = rg_meta.column(ncols + 2)
+
+    col_meta = rg_meta.column(0)
+    assert col_meta.file_offset > 0
+    assert col_meta.file_path == ''  # created from BytesIO
+    assert col_meta.physical_type == 'BOOLEAN'
+    assert col_meta.num_values == 10000
+    assert col_meta.path_in_schema == 'bool'
+    assert col_meta.is_stats_set is True
+    assert isinstance(col_meta.statistics, pq.Statistics)
+    assert col_meta.compression == 'SNAPPY'
+    assert col_meta.encodings == ('PLAIN', 'RLE')
+    assert col_meta.has_dictionary_page is False
+    assert col_meta.dictionary_page_offset is None
+    assert col_meta.data_page_offset > 0
+    assert col_meta.total_compressed_size > 0
+    assert col_meta.total_uncompressed_size > 0
+    with pytest.raises(NotImplementedError):
+        col_meta.has_index_page
+    with pytest.raises(NotImplementedError):
+        col_meta.index_page_offset
+
+
+def test_parquet_metadata_lifetime(tempdir):
+    # ARROW-6642 - ensure that chained access keeps parent objects alive
+    table = pa.table({'a': [1, 2, 3]})
+    pq.write_table(table, tempdir / 'test_metadata_segfault.parquet')
+    dataset = pq.ParquetDataset(tempdir / 'test_metadata_segfault.parquet')
+    dataset.pieces[0].get_metadata().row_group(0).column(0).statistics
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize(
+    (
+        'data',
+        'type',
+        'physical_type',
+        'min_value',
+        'max_value',
+        'null_count',
+        'num_values',
+        'distinct_count'
+    ),
+    [
+        ([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, 0),
+        ([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, 0),
+        ([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, 0),
+        ([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, 0),
+        ([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, 0),
+        ([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, 0),
+        ([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, 0),
+        ([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, 0),
+        (
+            [-1.1, 2.2, 2.3, None, 4.4], pa.float32(),
+            'FLOAT', -1.1, 4.4, 1, 4, 0
+        ),
+        (
+            [-1.1, 2.2, 2.3, None, 4.4], pa.float64(),
+            'DOUBLE', -1.1, 4.4, 1, 4, 0
+        ),
+        (
+            ['', 'b', chr(1000), None, 'aaa'], pa.binary(),
+            'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, 0
+        ),
+        (
+            [True, False, False, True, True], pa.bool_(),
+            'BOOLEAN', False, True, 0, 5, 0
+        ),
+        (
+            [b'\x00', b'b', b'12', None, b'aaa'], pa.binary(),
+            'BYTE_ARRAY', b'\x00', b'b', 1, 4, 0
+        ),
+    ]
+)
+def test_parquet_column_statistics_api(data, type, physical_type, min_value,
+                                       max_value, null_count, num_values,
+                                       distinct_count):
+    df = pd.DataFrame({'data': data})
+    schema = pa.schema([pa.field('data', type)])
+    table = pa.Table.from_pandas(df, schema=schema, safe=False)
+    fileh = make_sample_file(table)
+
+    meta = fileh.metadata
+
+    rg_meta = meta.row_group(0)
+    col_meta = rg_meta.column(0)
+
+    stat = col_meta.statistics
+    assert stat.has_min_max
+    assert _close(type, stat.min, min_value)
+    assert _close(type, stat.max, max_value)
+    assert stat.null_count == null_count
+    assert stat.num_values == num_values
+    # TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount
+    # method, missing distinct_count is represented as zero instead of None
+    assert stat.distinct_count == distinct_count
+    assert stat.physical_type == physical_type
+
+
+# ARROW-6339
+@pytest.mark.pandas
+def test_parquet_raise_on_unset_statistics():
+    df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")})
+    meta = make_sample_file(pa.Table.from_pandas(df)).metadata
+
+    assert not meta.row_group(0).column(0).statistics.has_min_max
+    assert meta.row_group(0).column(0).statistics.max is None
+
+
+def _close(type, left, right):
+    if type == pa.float32():
+        return abs(left - right) < 1E-7
+    elif type == pa.float64():
+        return abs(left - right) < 1E-13
+    else:
+        return left == right
+
+
+def test_statistics_convert_logical_types(tempdir):
+    # ARROW-5166, ARROW-4139
+
+    # (min, max, type)
+    cases = [(10, 11164359321221007157, pa.uint64()),
+             (10, 4294967295, pa.uint32()),
+             ("ähnlich", "öffentlich", pa.utf8()),
+             (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
+              pa.time32('ms')),
+             (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
+              pa.time64('us')),
+             (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
+              datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
+              pa.timestamp('ms')),
+             (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
+              datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
+              pa.timestamp('us'))]
+
+    for i, (min_val, max_val, typ) in enumerate(cases):
+        t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)],
+                                 ['col'])
+        path = str(tempdir / ('example{}.parquet'.format(i)))
+        pq.write_table(t, path, version='2.0')
+        pf = pq.ParquetFile(path)
+        stats = pf.metadata.row_group(0).column(0).statistics
+        assert stats.min == min_val
+        assert stats.max == max_val
+
+
+def test_parquet_write_disable_statistics(tempdir):
+    table = pa.Table.from_pydict(
+        OrderedDict([
+            ('a', pa.array([1, 2, 3])),
+            ('b', pa.array(['a', 'b', 'c']))
+        ])
+    )
+    _write_table(table, tempdir / 'data.parquet')
+    meta = pq.read_metadata(tempdir / 'data.parquet')
+    for col in [0, 1]:
+        cc = meta.row_group(0).column(col)
+        assert cc.is_stats_set is True
+        assert cc.statistics is not None
+
+    _write_table(table, tempdir / 'data2.parquet', write_statistics=False)
+    meta = pq.read_metadata(tempdir / 'data2.parquet')
+    for col in [0, 1]:
+        cc = meta.row_group(0).column(col)
+        assert cc.is_stats_set is False
+        assert cc.statistics is None
+
+    _write_table(table, tempdir / 'data3.parquet', write_statistics=['a'])
+    meta = pq.read_metadata(tempdir / 'data3.parquet')
+    cc_a = meta.row_group(0).column(0)
+    cc_b = meta.row_group(0).column(1)
+    assert cc_a.is_stats_set is True
+    assert cc_b.is_stats_set is False
+    assert cc_a.statistics is not None
+    assert cc_b.statistics is None
+
+
+@pytest.mark.pandas
+def test_pass_separate_metadata():
+    # ARROW-471
+    df = alltypes_sample(size=10000)
+
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, compression='snappy', version='2.0')
+
+    buf.seek(0)
+    metadata = pq.read_metadata(buf)
+
+    buf.seek(0)
+
+    fileh = pq.ParquetFile(buf, metadata=metadata)
+
+    tm.assert_frame_equal(df, fileh.read().to_pandas())
+
+
+def test_field_id_metadata():
+    # ARROW-7080
+    table = pa.table([pa.array([1], type='int32'),
+                      pa.array([[]], type=pa.list_(pa.int32())),
+                      pa.array([b'boo'], type='binary')],
+                     ['f0', 'f1', 'f2'])
+
+    bio = pa.BufferOutputStream()
+    pq.write_table(table, bio)
+    contents = bio.getvalue()
+
+    pf = pq.ParquetFile(pa.BufferReader(contents))
+    schema = pf.schema_arrow
+
+    # Expected Parquet schema for reference
+    #
+    # required group field_id=0 schema {
+    #   optional int32 field_id=1 f0;
+    #   optional group field_id=2 f1 (List) {
+    #     repeated group field_id=3 list {
+    #       optional int32 field_id=4 item;
+    #     }
+    #   }
+    #   optional binary field_id=5 f2;
+    # }
+
+    field_name = b'PARQUET:field_id'
+    assert schema[0].metadata[field_name] == b'1'
+
+    list_field = schema[1]
+    assert list_field.metadata[field_name] == b'2'
+
+    list_item_field = list_field.type.value_field
+    assert list_item_field.metadata[field_name] == b'4'
+
+    assert schema[2].metadata[field_name] == b'5'
+
+
+@pytest.mark.pandas
+def test_read_metadata_files(tempdir):

Review comment:
       In the original test_parquet.py, there was another test `test_read_common_metadata_files` which is very similar as this one, so I think we should keep them together (probably in test_dataset.py, as it is explicitly testing the `_(common)_metadata` files in partitioned datasets (not specifically the FileMetadata object itself)

##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet
+
+
+def _write_table(table, path, **kwargs):
+    # So we see the ImportError somewhere
+    import pyarrow.parquet as pq
+    from pyarrow.pandas_compat import _pandas_api
+
+    if _pandas_api.is_data_frame(table):
+        table = pa.Table.from_pandas(table)
+
+    pq.write_table(table, path, **kwargs)
+    return table
+
+
+def _read_table(*args, **kwargs):
+    import pyarrow.parquet as pq
+
+    table = pq.read_table(*args, **kwargs)
+    table.validate(full=True)
+    return table
+
+
+def _roundtrip_table(table, read_table_kwargs=None,
+                     write_table_kwargs=None, use_legacy_dataset=True):
+    read_table_kwargs = read_table_kwargs or {}
+    write_table_kwargs = write_table_kwargs or {}
+
+    writer = pa.BufferOutputStream()
+    _write_table(table, writer, **write_table_kwargs)
+    reader = pa.BufferReader(writer.getvalue())
+    return _read_table(reader, use_legacy_dataset=use_legacy_dataset,
+                       **read_table_kwargs)
+
+
+def _check_roundtrip(table, expected=None, read_table_kwargs=None,
+                     use_legacy_dataset=True, **write_table_kwargs):
+    if expected is None:
+        expected = table
+
+    read_table_kwargs = read_table_kwargs or {}
+
+    # intentionally check twice
+    result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+    result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+
+
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
+    table = pa.Table.from_pandas(df)
+    result = _roundtrip_table(
+        table, write_table_kwargs=write_kwargs,
+        use_legacy_dataset=use_legacy_dataset)
+    return result.to_pandas()
+
+
+def _test_read_common_metadata_files(fs, base_path):

Review comment:
       Hmm, but since that is even further away (not even in this directory), if possible I would still move them to `test_dataset.py` where they are used more

##########
File path: python/pyarrow/tests/parquet/test_basic.py
##########
@@ -0,0 +1,1063 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import decimal
+import io
+import os
+
+import numpy as np
+import pyarrow as pa
+import pytest
+from pyarrow import fs
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table,
+                                          make_sample_file,
+                                          parametrize_legacy_dataset)
+
+try:
+    import pyarrow.parquet as pq
+    from pyarrow.tests.parquet.common import (_read_table, _test_dataframe,
+                                              _write_table)
+except ImportError:
+    pq = None
+
+
+try:
+    import pandas as pd
+    import pandas.testing as tm
+    from pyarrow.tests.pandas_examples import (dataframe_with_arrays,
+                                               dataframe_with_lists)
+    from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+    pd = tm = None
+
+
+def test_large_binary():
+    data = [b'foo', b'bar'] * 50
+    for type in [pa.large_binary(), pa.large_string()]:
+        arr = pa.array(data, type=type)
+        table = pa.Table.from_arrays([arr], names=['strs'])
+        for use_dictionary in [False, True]:
+            _check_roundtrip(table, use_dictionary=use_dictionary)
+
+
+@pytest.mark.large_memory
+def test_large_binary_huge():
+    s = b'xy' * 997
+    data = [s] * ((1 << 33) // len(s))
+    for type in [pa.large_binary(), pa.large_string()]:
+        arr = pa.array(data, type=type)
+        table = pa.Table.from_arrays([arr], names=['strs'])
+        for use_dictionary in [False, True]:
+            _check_roundtrip(table, use_dictionary=use_dictionary)
+        del arr, table
+
+
+@pytest.mark.large_memory
+def test_large_binary_overflow():
+    s = b'x' * (1 << 31)
+    arr = pa.array([s], type=pa.large_binary())
+    table = pa.Table.from_arrays([arr], names=['strs'])
+    for use_dictionary in [False, True]:
+        writer = pa.BufferOutputStream()
+        with pytest.raises(
+                pa.ArrowInvalid,
+                match="Parquet cannot store strings with size 2GB or more"):
+            _write_table(table, writer, use_dictionary=use_dictionary)
+
+
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dtype', [int, float])
+def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset):
+    filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
+    data = [pa.array(list(map(dtype, range(5))))]
+    table = pa.Table.from_arrays(data, names=['a'])
+    _write_table(table, filename)
+    table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
+    for i in range(table.num_columns):
+        col_written = table[i]
+        col_read = table_read[i]
+        assert table.field(i).name == table_read.field(i).name
+        assert col_read.num_chunks == 1
+        data_written = col_written.chunk(0)
+        data_read = col_read.chunk(0)
+        assert data_written.equals(data_read)
+
+
+def test_parquet_invalid_version(tempdir):
+    table = pa.table({'a': [1, 2, 3]})
+    with pytest.raises(ValueError, match="Unsupported Parquet format version"):
+        _write_table(table, tempdir / 'test_version.parquet', version="2.2")
+    with pytest.raises(ValueError, match="Unsupported Parquet data page " +
+                       "version"):
+        _write_table(table, tempdir / 'test_version.parquet',
+                     data_page_version="2.2")
+
+
+@parametrize_legacy_dataset
+def test_set_data_page_size(use_legacy_dataset):
+    arr = pa.array([1, 2, 3] * 100000)
+    t = pa.Table.from_arrays([arr], names=['f0'])
+
+    # 128K, 512K
+    page_sizes = [2 << 16, 2 << 18]
+    for target_page_size in page_sizes:
+        _check_roundtrip(t, data_page_size=target_page_size,
+                         use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_chunked_table_write(use_legacy_dataset):
+    # ARROW-232
+    tables = []
+    batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10))
+    tables.append(pa.Table.from_batches([batch] * 3))
+    df, _ = dataframe_with_lists()
+    batch = pa.RecordBatch.from_pandas(df)
+    tables.append(pa.Table.from_batches([batch] * 3))
+
+    for data_page_version in ['1.0', '2.0']:
+        for use_dictionary in [True, False]:
+            for table in tables:
+                _check_roundtrip(
+                    table, version='2.0',
+                    use_legacy_dataset=use_legacy_dataset,
+                    data_page_version=data_page_version,
+                    use_dictionary=use_dictionary)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_memory_map(tempdir, use_legacy_dataset):
+    df = alltypes_sample(size=10)
+
+    table = pa.Table.from_pandas(df)
+    _check_roundtrip(table, read_table_kwargs={'memory_map': True},
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
+
+    filename = str(tempdir / 'tmp_file')
+    with open(filename, 'wb') as f:
+        _write_table(table, f, version='2.0')
+    table_read = pq.read_pandas(filename, memory_map=True,
+                                use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_enable_buffered_stream(tempdir, use_legacy_dataset):
+    df = alltypes_sample(size=10)
+
+    table = pa.Table.from_pandas(df)
+    _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025},
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
+
+    filename = str(tempdir / 'tmp_file')
+    with open(filename, 'wb') as f:
+        _write_table(table, f, version='2.0')
+    table_read = pq.read_pandas(filename, buffer_size=4096,
+                                use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_special_chars_filename(tempdir, use_legacy_dataset):
+    table = pa.Table.from_arrays([pa.array([42])], ["ints"])
+    filename = "foo # bar"
+    path = tempdir / filename
+    assert not path.exists()
+    _write_table(table, str(path))
+    assert path.exists()
+    table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@pytest.mark.slow
+def test_file_with_over_int16_max_row_groups():
+    # PARQUET-1857: Parquet encryption support introduced a INT16_MAX upper
+    # limit on the number of row groups, but this limit only impacts files with
+    # encrypted row group metadata because of the int16 row group ordinal used
+    # in the Parquet Thrift metadata. Unencrypted files are not impacted, so
+    # this test checks that it works (even if it isn't a good idea)
+    t = pa.table([list(range(40000))], names=['f0'])
+    _check_roundtrip(t, row_group_size=1)
+
+
+@parametrize_legacy_dataset
+def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
+    # Reproduce failure in ARROW-5630
+    typ = pa.list_(pa.field("item", pa.float32(), False))
+    num_rows = 10000
+    t = pa.table([
+        pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] *
+                  (num_rows // 10)), type=typ)
+    ], ['a'])
+    _check_roundtrip(
+        t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multiple_path_types(tempdir, use_legacy_dataset):
+    # Test compatibility with PEP 519 path-like objects
+    path = tempdir / 'zzz.parquet'
+    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+    _write_table(df, path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+    # Test compatibility with plain string paths
+    path = str(tempdir) + 'zzz.parquet'
+    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+    _write_table(df, path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.dataset
+@parametrize_legacy_dataset
+@pytest.mark.parametrize("filesystem", [
+    None, fs.LocalFileSystem(), LocalFileSystem._get_instance()
+])
+def test_relative_paths(tempdir, use_legacy_dataset, filesystem):
+    # reading and writing from relative paths
+    table = pa.table({"a": [1, 2, 3]})
+
+    # reading
+    pq.write_table(table, str(tempdir / "data.parquet"))
+    with util.change_cwd(tempdir):
+        result = pq.read_table("data.parquet", filesystem=filesystem,
+                               use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+    # writing
+    with util.change_cwd(tempdir):
+        pq.write_table(table, "data2.parquet", filesystem=filesystem)
+    result = pq.read_table(tempdir / "data2.parquet")
+    assert result.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_read_non_existing_file(use_legacy_dataset):
+    # ensure we have a proper error message
+    with pytest.raises(FileNotFoundError):
+        pq.read_table('i-am-not-existing.parquet')
+
+
+@parametrize_legacy_dataset
+def test_parquet_read_from_buffer(tempdir, use_legacy_dataset):
+    # reading from a buffer from python's open()
+    table = pa.table({"a": [1, 2, 3]})
+    pq.write_table(table, str(tempdir / "data.parquet"))
+
+    with open(str(tempdir / "data.parquet"), "rb") as f:
+        result = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+    with open(str(tempdir / "data.parquet"), "rb") as f:
+        result = pq.read_table(pa.PythonFile(f),
+                               use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+
+    writer = pq.ParquetWriter(out, arrow_table.schema, version='2.0')
+
+    frames = []
+    for i in range(10):
+        df['unique_id'] = i
+        arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+        writer.write_table(arrow_table)
+
+        frames.append(df.copy())
+
+    writer.close()
+
+    buf = out.getvalue()
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@parametrize_legacy_dataset
+def test_byte_stream_split(use_legacy_dataset):
+    # This is only a smoke test.
+    arr_float = pa.array(list(map(float, range(100))))
+    arr_int = pa.array(list(map(int, range(100))))
+    data_float = [arr_float, arr_float]
+    table = pa.Table.from_arrays(data_float, names=['a', 'b'])
+
+    # Check with byte_stream_split for both columns.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=False, use_byte_stream_split=True)
+
+    # Check with byte_stream_split for column 'b' and dictionary
+    # for column 'a'.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=['a'],
+                     use_byte_stream_split=['b'])
+
+    # Check with a collision for both columns.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=['a', 'b'],
+                     use_byte_stream_split=['a', 'b'])
+
+    # Check with mixed column types.
+    mixed_table = pa.Table.from_arrays([arr_float, arr_int],
+                                       names=['a', 'b'])
+    _check_roundtrip(mixed_table, expected=mixed_table,
+                     use_dictionary=['b'],
+                     use_byte_stream_split=['a'])
+
+    # Try to use the wrong data type with the byte_stream_split encoding.
+    # This should throw an exception.
+    table = pa.Table.from_arrays([arr_int], names=['tmp'])
+    with pytest.raises(IOError):
+        _check_roundtrip(table, expected=table, use_byte_stream_split=True,
+                         use_dictionary=False,
+                         use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+def test_compression_level(use_legacy_dataset):
+    arr = pa.array(list(map(int, range(1000))))
+    data = [arr, arr]
+    table = pa.Table.from_arrays(data, names=['a', 'b'])
+
+    # Check one compression level.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level=1,
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check another one to make sure that compression_level=1 does not
+    # coincide with the default one in Arrow.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level=5,
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that the user can provide a compression per column
+    _check_roundtrip(table, expected=table,
+                     compression={'a': "gzip", 'b': "snappy"},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that the user can provide a compression level per column
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level={'a': 2, 'b': 3},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that specifying a compression level for a codec which does allow
+    # specifying one, results into an error.
+    # Uncompressed, snappy, lz4 and lzo do not support specifying a compression
+    # level.
+    # GZIP (zlib) allows for specifying a compression level but as of up
+    # to version 1.2.11 the valid range is [-1, 9].
+    invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337),
+                            ("None", 444), ("lzo", 14)]
+    buf = io.BytesIO()
+    for (codec, level) in invalid_combinations:
+        with pytest.raises((ValueError, OSError)):
+            _write_table(table, buf, compression=codec,
+                         compression_level=level)
+
+
+@pytest.mark.pandas
+def test_compare_schemas():

Review comment:
       This one can probably go to `test_metadata`, as the ParquetSchema is part of the FileMetadata

##########
File path: python/pyarrow/tests/parquet/test_basic.py
##########
@@ -0,0 +1,1063 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import decimal
+import io
+import os
+
+import numpy as np
+import pyarrow as pa
+import pytest
+from pyarrow import fs
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table,
+                                          make_sample_file,
+                                          parametrize_legacy_dataset)
+
+try:
+    import pyarrow.parquet as pq
+    from pyarrow.tests.parquet.common import (_read_table, _test_dataframe,
+                                              _write_table)
+except ImportError:
+    pq = None
+
+
+try:
+    import pandas as pd
+    import pandas.testing as tm
+    from pyarrow.tests.pandas_examples import (dataframe_with_arrays,
+                                               dataframe_with_lists)
+    from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+    pd = tm = None
+
+
+def test_large_binary():
+    data = [b'foo', b'bar'] * 50
+    for type in [pa.large_binary(), pa.large_string()]:
+        arr = pa.array(data, type=type)
+        table = pa.Table.from_arrays([arr], names=['strs'])
+        for use_dictionary in [False, True]:
+            _check_roundtrip(table, use_dictionary=use_dictionary)
+
+
+@pytest.mark.large_memory
+def test_large_binary_huge():
+    s = b'xy' * 997
+    data = [s] * ((1 << 33) // len(s))
+    for type in [pa.large_binary(), pa.large_string()]:
+        arr = pa.array(data, type=type)
+        table = pa.Table.from_arrays([arr], names=['strs'])
+        for use_dictionary in [False, True]:
+            _check_roundtrip(table, use_dictionary=use_dictionary)
+        del arr, table
+
+
+@pytest.mark.large_memory
+def test_large_binary_overflow():
+    s = b'x' * (1 << 31)
+    arr = pa.array([s], type=pa.large_binary())
+    table = pa.Table.from_arrays([arr], names=['strs'])
+    for use_dictionary in [False, True]:
+        writer = pa.BufferOutputStream()
+        with pytest.raises(
+                pa.ArrowInvalid,
+                match="Parquet cannot store strings with size 2GB or more"):
+            _write_table(table, writer, use_dictionary=use_dictionary)
+
+
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dtype', [int, float])
+def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset):
+    filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
+    data = [pa.array(list(map(dtype, range(5))))]
+    table = pa.Table.from_arrays(data, names=['a'])
+    _write_table(table, filename)
+    table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
+    for i in range(table.num_columns):
+        col_written = table[i]
+        col_read = table_read[i]
+        assert table.field(i).name == table_read.field(i).name
+        assert col_read.num_chunks == 1
+        data_written = col_written.chunk(0)
+        data_read = col_read.chunk(0)
+        assert data_written.equals(data_read)
+
+
+def test_parquet_invalid_version(tempdir):
+    table = pa.table({'a': [1, 2, 3]})
+    with pytest.raises(ValueError, match="Unsupported Parquet format version"):
+        _write_table(table, tempdir / 'test_version.parquet', version="2.2")
+    with pytest.raises(ValueError, match="Unsupported Parquet data page " +
+                       "version"):
+        _write_table(table, tempdir / 'test_version.parquet',
+                     data_page_version="2.2")
+
+
+@parametrize_legacy_dataset
+def test_set_data_page_size(use_legacy_dataset):
+    arr = pa.array([1, 2, 3] * 100000)
+    t = pa.Table.from_arrays([arr], names=['f0'])
+
+    # 128K, 512K
+    page_sizes = [2 << 16, 2 << 18]
+    for target_page_size in page_sizes:
+        _check_roundtrip(t, data_page_size=target_page_size,
+                         use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_chunked_table_write(use_legacy_dataset):
+    # ARROW-232
+    tables = []
+    batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10))
+    tables.append(pa.Table.from_batches([batch] * 3))
+    df, _ = dataframe_with_lists()
+    batch = pa.RecordBatch.from_pandas(df)
+    tables.append(pa.Table.from_batches([batch] * 3))
+
+    for data_page_version in ['1.0', '2.0']:
+        for use_dictionary in [True, False]:
+            for table in tables:
+                _check_roundtrip(
+                    table, version='2.0',
+                    use_legacy_dataset=use_legacy_dataset,
+                    data_page_version=data_page_version,
+                    use_dictionary=use_dictionary)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_memory_map(tempdir, use_legacy_dataset):
+    df = alltypes_sample(size=10)
+
+    table = pa.Table.from_pandas(df)
+    _check_roundtrip(table, read_table_kwargs={'memory_map': True},
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
+
+    filename = str(tempdir / 'tmp_file')
+    with open(filename, 'wb') as f:
+        _write_table(table, f, version='2.0')
+    table_read = pq.read_pandas(filename, memory_map=True,
+                                use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_enable_buffered_stream(tempdir, use_legacy_dataset):
+    df = alltypes_sample(size=10)
+
+    table = pa.Table.from_pandas(df)
+    _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025},
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
+
+    filename = str(tempdir / 'tmp_file')
+    with open(filename, 'wb') as f:
+        _write_table(table, f, version='2.0')
+    table_read = pq.read_pandas(filename, buffer_size=4096,
+                                use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_special_chars_filename(tempdir, use_legacy_dataset):
+    table = pa.Table.from_arrays([pa.array([42])], ["ints"])
+    filename = "foo # bar"
+    path = tempdir / filename
+    assert not path.exists()
+    _write_table(table, str(path))
+    assert path.exists()
+    table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@pytest.mark.slow
+def test_file_with_over_int16_max_row_groups():
+    # PARQUET-1857: Parquet encryption support introduced a INT16_MAX upper
+    # limit on the number of row groups, but this limit only impacts files with
+    # encrypted row group metadata because of the int16 row group ordinal used
+    # in the Parquet Thrift metadata. Unencrypted files are not impacted, so
+    # this test checks that it works (even if it isn't a good idea)
+    t = pa.table([list(range(40000))], names=['f0'])
+    _check_roundtrip(t, row_group_size=1)
+
+
+@parametrize_legacy_dataset
+def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
+    # Reproduce failure in ARROW-5630
+    typ = pa.list_(pa.field("item", pa.float32(), False))
+    num_rows = 10000
+    t = pa.table([
+        pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] *
+                  (num_rows // 10)), type=typ)
+    ], ['a'])
+    _check_roundtrip(
+        t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multiple_path_types(tempdir, use_legacy_dataset):
+    # Test compatibility with PEP 519 path-like objects
+    path = tempdir / 'zzz.parquet'
+    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+    _write_table(df, path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+    # Test compatibility with plain string paths
+    path = str(tempdir) + 'zzz.parquet'
+    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+    _write_table(df, path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.dataset
+@parametrize_legacy_dataset
+@pytest.mark.parametrize("filesystem", [
+    None, fs.LocalFileSystem(), LocalFileSystem._get_instance()
+])
+def test_relative_paths(tempdir, use_legacy_dataset, filesystem):
+    # reading and writing from relative paths
+    table = pa.table({"a": [1, 2, 3]})
+
+    # reading
+    pq.write_table(table, str(tempdir / "data.parquet"))
+    with util.change_cwd(tempdir):
+        result = pq.read_table("data.parquet", filesystem=filesystem,
+                               use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+    # writing
+    with util.change_cwd(tempdir):
+        pq.write_table(table, "data2.parquet", filesystem=filesystem)
+    result = pq.read_table(tempdir / "data2.parquet")
+    assert result.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_read_non_existing_file(use_legacy_dataset):
+    # ensure we have a proper error message
+    with pytest.raises(FileNotFoundError):
+        pq.read_table('i-am-not-existing.parquet')
+
+
+@parametrize_legacy_dataset
+def test_parquet_read_from_buffer(tempdir, use_legacy_dataset):
+    # reading from a buffer from python's open()
+    table = pa.table({"a": [1, 2, 3]})
+    pq.write_table(table, str(tempdir / "data.parquet"))
+
+    with open(str(tempdir / "data.parquet"), "rb") as f:
+        result = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+    with open(str(tempdir / "data.parquet"), "rb") as f:
+        result = pq.read_table(pa.PythonFile(f),
+                               use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+
+    writer = pq.ParquetWriter(out, arrow_table.schema, version='2.0')
+
+    frames = []
+    for i in range(10):
+        df['unique_id'] = i
+        arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+        writer.write_table(arrow_table)
+
+        frames.append(df.copy())
+
+    writer.close()
+
+    buf = out.getvalue()
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@parametrize_legacy_dataset
+def test_byte_stream_split(use_legacy_dataset):
+    # This is only a smoke test.
+    arr_float = pa.array(list(map(float, range(100))))
+    arr_int = pa.array(list(map(int, range(100))))
+    data_float = [arr_float, arr_float]
+    table = pa.Table.from_arrays(data_float, names=['a', 'b'])
+
+    # Check with byte_stream_split for both columns.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=False, use_byte_stream_split=True)
+
+    # Check with byte_stream_split for column 'b' and dictionary
+    # for column 'a'.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=['a'],
+                     use_byte_stream_split=['b'])
+
+    # Check with a collision for both columns.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=['a', 'b'],
+                     use_byte_stream_split=['a', 'b'])
+
+    # Check with mixed column types.
+    mixed_table = pa.Table.from_arrays([arr_float, arr_int],
+                                       names=['a', 'b'])
+    _check_roundtrip(mixed_table, expected=mixed_table,
+                     use_dictionary=['b'],
+                     use_byte_stream_split=['a'])
+
+    # Try to use the wrong data type with the byte_stream_split encoding.
+    # This should throw an exception.
+    table = pa.Table.from_arrays([arr_int], names=['tmp'])
+    with pytest.raises(IOError):
+        _check_roundtrip(table, expected=table, use_byte_stream_split=True,
+                         use_dictionary=False,
+                         use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+def test_compression_level(use_legacy_dataset):
+    arr = pa.array(list(map(int, range(1000))))
+    data = [arr, arr]
+    table = pa.Table.from_arrays(data, names=['a', 'b'])
+
+    # Check one compression level.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level=1,
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check another one to make sure that compression_level=1 does not
+    # coincide with the default one in Arrow.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level=5,
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that the user can provide a compression per column
+    _check_roundtrip(table, expected=table,
+                     compression={'a': "gzip", 'b': "snappy"},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that the user can provide a compression level per column
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level={'a': 2, 'b': 3},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that specifying a compression level for a codec which does allow
+    # specifying one, results into an error.
+    # Uncompressed, snappy, lz4 and lzo do not support specifying a compression
+    # level.
+    # GZIP (zlib) allows for specifying a compression level but as of up
+    # to version 1.2.11 the valid range is [-1, 9].
+    invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337),
+                            ("None", 444), ("lzo", 14)]
+    buf = io.BytesIO()
+    for (codec, level) in invalid_combinations:
+        with pytest.raises((ValueError, OSError)):
+            _write_table(table, buf, compression=codec,
+                         compression_level=level)
+
+
+@pytest.mark.pandas
+def test_compare_schemas():
+    df = alltypes_sample(size=10000)
+
+    fileh = make_sample_file(df)
+    fileh2 = make_sample_file(df)
+    fileh3 = make_sample_file(df[df.columns[::2]])
+
+    # ParquetSchema
+    assert isinstance(fileh.schema, pq.ParquetSchema)
+    assert fileh.schema.equals(fileh.schema)
+    assert fileh.schema == fileh.schema
+    assert fileh.schema.equals(fileh2.schema)
+    assert fileh.schema == fileh2.schema
+    assert fileh.schema != 'arbitrary object'
+    assert not fileh.schema.equals(fileh3.schema)
+    assert fileh.schema != fileh3.schema
+
+    # ColumnSchema
+    assert isinstance(fileh.schema[0], pq.ColumnSchema)
+    assert fileh.schema[0].equals(fileh.schema[0])
+    assert fileh.schema[0] == fileh.schema[0]
+    assert not fileh.schema[0].equals(fileh.schema[1])
+    assert fileh.schema[0] != fileh.schema[1]
+    assert fileh.schema[0] != 'arbitrary object'
+
+
+def test_validate_schema_write_table(tempdir):
+    # ARROW-2926
+    simple_fields = [
+        pa.field('POS', pa.uint32()),
+        pa.field('desc', pa.string())
+    ]
+
+    simple_schema = pa.schema(simple_fields)
+
+    # simple_table schema does not match simple_schema
+    simple_from_array = [pa.array([1]), pa.array(['bla'])]
+    simple_table = pa.Table.from_arrays(simple_from_array, ['POS', 'desc'])
+
+    path = tempdir / 'simple_validate_schema.parquet'
+
+    with pq.ParquetWriter(path, simple_schema,
+                          version='2.0',
+                          compression='snappy', flavor='spark') as w:
+        with pytest.raises(ValueError):
+            w.write_table(simple_table)
+
+
+@pytest.mark.pandas
+def test_column_of_arrays(tempdir):
+    df, schema = dataframe_with_arrays()
+
+    filename = tempdir / 'pandas_roundtrip.parquet'
+    arrow_table = pa.Table.from_pandas(df, schema=schema)
+    _write_table(arrow_table, filename, version="2.0", coerce_timestamps='ms')
+    table_read = _read_table(filename)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+def test_column_of_lists(tempdir):
+    df, schema = dataframe_with_lists(parquet_compatible=True)
+
+    filename = tempdir / 'pandas_roundtrip.parquet'
+    arrow_table = pa.Table.from_pandas(df, schema=schema)
+    _write_table(arrow_table, filename, version='2.0')
+    table_read = _read_table(filename)
+    df_read = table_read.to_pandas()
+
+    tm.assert_frame_equal(df, df_read)
+
+
+def test_large_list_records():
+    # This was fixed in PARQUET-1100
+
+    list_lengths = np.random.randint(0, 500, size=50)
+    list_lengths[::10] = 0
+
+    list_values = [list(map(int, np.random.randint(0, 100, size=x)))
+                   if i % 8 else None
+                   for i, x in enumerate(list_lengths)]
+
+    a1 = pa.array(list_values)
+
+    table = pa.Table.from_arrays([a1], ['int_lists'])
+    _check_roundtrip(table)
+
+
+def test_sanitized_spark_field_names():
+    a0 = pa.array([0, 1, 2, 3, 4])
+    name = 'prohib; ,\t{}'
+    table = pa.Table.from_arrays([a0], [name])
+
+    result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'})
+
+    expected_name = 'prohib______'
+    assert result.schema[0].name == expected_name
+
+
+def test_fixed_size_binary():
+    t0 = pa.binary(10)
+    data = [b'fooooooooo', None, b'barooooooo', b'quxooooooo']
+    a0 = pa.array(data, type=t0)
+
+    table = pa.Table.from_arrays([a0],
+                                 ['binary[10]'])
+    _check_roundtrip(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multithreaded_read(use_legacy_dataset):
+    df = alltypes_sample(size=10000)
+
+    table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(table, buf, compression='SNAPPY', version='2.0')
+
+    buf.seek(0)
+    table1 = _read_table(
+        buf, use_threads=True, use_legacy_dataset=use_legacy_dataset)
+
+    buf.seek(0)
+    table2 = _read_table(
+        buf, use_threads=False, use_legacy_dataset=use_legacy_dataset)
+
+    assert table1.equals(table2)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_min_chunksize(use_legacy_dataset):
+    data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
+    table = pa.Table.from_pandas(data.reset_index())
+
+    buf = io.BytesIO()
+    _write_table(table, buf, chunk_size=-1)
+
+    buf.seek(0)
+    result = _read_table(buf, use_legacy_dataset=use_legacy_dataset)
+
+    assert result.equals(table)
+
+    with pytest.raises(ValueError):
+        _write_table(table, buf, chunk_size=0)
+
+
+@pytest.mark.pandas
+def test_read_single_row_group():
+    # ARROW-471
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+
+    pf = pq.ParquetFile(buf)
+
+    assert pf.num_row_groups == K
+
+    row_groups = [pf.read_row_group(i) for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    tm.assert_frame_equal(df, result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_single_row_group_with_column_subset():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf)
+
+    cols = list(df.columns[:2])
+    row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+    # ARROW-4267: Selection of duplicate columns still leads to these columns
+    # being read uniquely.
+    row_groups = [pf.read_row_group(i, columns=cols + cols) for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_multiple_row_groups():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+
+    pf = pq.ParquetFile(buf)
+
+    assert pf.num_row_groups == K
+
+    result = pf.read_row_groups(range(K))
+    tm.assert_frame_equal(df, result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_multiple_row_groups_with_column_subset():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf)
+
+    cols = list(df.columns[:2])
+    result = pf.read_row_groups(range(K), columns=cols)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+    # ARROW-4267: Selection of duplicate columns still leads to these columns
+    # being read uniquely.
+    result = pf.read_row_groups(range(K), columns=cols + cols)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_scan_contents():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf)
+
+    assert pf.scan_contents() == 10000
+    assert pf.scan_contents(df.columns[:4]) == 10000
+
+
+@pytest.mark.pandas
+def test_write_error_deletes_incomplete_file(tempdir):
+    # ARROW-1285
+    df = pd.DataFrame({'a': list('abc'),
+                       'b': list(range(1, 4)),
+                       'c': np.arange(3, 6).astype('u1'),
+                       'd': np.arange(4.0, 7.0, dtype='float64'),
+                       'e': [True, False, True],
+                       'f': pd.Categorical(list('abc')),
+                       'g': pd.date_range('20130101', periods=3),
+                       'h': pd.date_range('20130101', periods=3,
+                                          tz='US/Eastern'),
+                       'i': pd.date_range('20130101', periods=3, freq='ns')})
+
+    pdf = pa.Table.from_pandas(df)
+
+    filename = tempdir / 'tmp_file'
+    try:
+        _write_table(pdf, filename)
+    except pa.ArrowException:
+        pass
+
+    assert not filename.exists()
+
+
+@parametrize_legacy_dataset
+def test_read_non_existent_file(tempdir, use_legacy_dataset):
+    path = 'non-existent-file.parquet'
+    try:
+        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+    except Exception as e:
+        assert path in e.args[0]
+
+
+@parametrize_legacy_dataset
+def test_read_table_doesnt_warn(datadir, use_legacy_dataset):
+    with pytest.warns(None) as record:
+        pq.read_table(datadir / 'v0.7.1.parquet',
+                      use_legacy_dataset=use_legacy_dataset)
+
+    assert len(record) == 0
+
+
+@pytest.mark.large_memory
+def test_large_table_int32_overflow():
+    size = np.iinfo('int32').max + 1
+
+    arr = np.ones(size, dtype='uint8')
+
+    parr = pa.array(arr, type=pa.uint8())
+
+    table = pa.Table.from_arrays([parr], names=['one'])
+    f = io.BytesIO()
+    _write_table(table, f)
+
+
+def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs):
+    stream = pa.BufferOutputStream()
+    _write_table(table, stream, **write_kwargs)
+    buf = stream.getvalue()
+    return _read_table(buf, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_byte_array_exactly_2gb(use_legacy_dataset):
+    # Test edge case reported in ARROW-3762
+    val = b'x' * (1 << 10)
+
+    base = pa.array([val] * ((1 << 21) - 1))
+    cases = [
+        [b'x' * 1023],  # 2^31 - 1
+        [b'x' * 1024],  # 2^31
+        [b'x' * 1025]   # 2^31 + 1
+    ]
+    for case in cases:
+        values = pa.chunked_array([base, pa.array(case)])
+        t = pa.table([values], names=['f0'])
+        result = _simple_table_roundtrip(
+            t, use_legacy_dataset=use_legacy_dataset, use_dictionary=False)
+        assert t.equals(result)
+
+
+@pytest.mark.pandas
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_binary_array_overflow_to_chunked(use_legacy_dataset):
+    # ARROW-3762
+
+    # 2^31 + 1 bytes
+    values = [b'x'] + [
+        b'x' * (1 << 20)
+    ] * 2 * (1 << 10)
+    df = pd.DataFrame({'byte_col': values})
+
+    tbl = pa.Table.from_pandas(df, preserve_index=False)
+    read_tbl = _simple_table_roundtrip(
+        tbl, use_legacy_dataset=use_legacy_dataset)
+
+    col0_data = read_tbl[0]
+    assert isinstance(col0_data, pa.ChunkedArray)
+
+    # Split up into 2GB chunks
+    assert col0_data.num_chunks == 2
+
+    assert tbl.equals(read_tbl)
+
+
+@pytest.mark.pandas
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_list_of_binary_large_cell(use_legacy_dataset):
+    # ARROW-4688
+    data = []
+
+    # TODO(wesm): handle chunked children
+    # 2^31 - 1 bytes in a single cell
+    # data.append([b'x' * (1 << 20)] * 2047 + [b'x' * ((1 << 20) - 1)])
+
+    # A little under 2GB in cell each containing approximately 10MB each
+    data.extend([[b'x' * 1000000] * 10] * 214)
+
+    arr = pa.array(data)
+    table = pa.Table.from_arrays([arr], ['chunky_cells'])
+    read_table = _simple_table_roundtrip(
+        table, use_legacy_dataset=use_legacy_dataset)
+    assert table.equals(read_table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_nested_convenience(tempdir, use_legacy_dataset):
+    # ARROW-1684
+    df = pd.DataFrame({
+        'a': [[1, 2, 3], None, [4, 5], []],
+        'b': [[1.], None, None, [6., 7.]],
+    })
+
+    path = str(tempdir / 'nested_convenience.parquet')
+
+    table = pa.Table.from_pandas(df, preserve_index=False)
+    _write_table(table, path)
+
+    read = pq.read_table(
+        path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
+    tm.assert_frame_equal(read.to_pandas(), df[['a']])
+
+    read = pq.read_table(
+        path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset)
+    tm.assert_frame_equal(read.to_pandas(), df)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_decimal_roundtrip(tempdir, use_legacy_dataset):
+    num_values = 10
+
+    columns = {}
+    for precision in range(1, 39):
+        for scale in range(0, precision + 1):
+            with util.random_seed(0):
+                random_decimal_values = [
+                    util.randdecimal(precision, scale)
+                    for _ in range(num_values)
+                ]
+            column_name = ('dec_precision_{:d}_scale_{:d}'
+                           .format(precision, scale))
+            columns[column_name] = random_decimal_values
+
+    expected = pd.DataFrame(columns)
+    filename = tempdir / 'decimals.parquet'
+    string_filename = str(filename)
+    table = pa.Table.from_pandas(expected)
+    _write_table(table, string_filename)
+    result_table = _read_table(
+        string_filename, use_legacy_dataset=use_legacy_dataset)
+    result = result_table.to_pandas()
+    tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@pytest.mark.xfail(
+    raises=pa.ArrowException, reason='Parquet does not support negative scale'
+)
+def test_decimal_roundtrip_negative_scale(tempdir):
+    expected = pd.DataFrame({'decimal_num': [decimal.Decimal('1.23E4')]})
+    filename = tempdir / 'decimals.parquet'
+    string_filename = str(filename)
+    t = pa.Table.from_pandas(expected)
+    _write_table(t, string_filename)
+    result_table = _read_table(string_filename)
+    result = result_table.to_pandas()
+    tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_writer_context_obj(tempdir, use_legacy_dataset):
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+
+    with pq.ParquetWriter(out, arrow_table.schema, version='2.0') as writer:
+
+        frames = []
+        for i in range(10):
+            df['unique_id'] = i
+            arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+            writer.write_table(arrow_table)
+
+            frames.append(df.copy())
+
+    buf = out.getvalue()
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_writer_context_obj_with_exception(
+    tempdir, use_legacy_dataset
+):
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+    error_text = 'Artificial Error'
+
+    try:
+        with pq.ParquetWriter(out,
+                              arrow_table.schema,
+                              version='2.0') as writer:
+
+            frames = []
+            for i in range(10):
+                df['unique_id'] = i
+                arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+                writer.write_table(arrow_table)
+                frames.append(df.copy())
+                if i == 5:
+                    raise ValueError(error_text)
+    except Exception as e:
+        assert str(e) == error_text
+
+    buf = out.getvalue()
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_zlib_compression_bug(use_legacy_dataset):
+    # ARROW-3514: "zlib deflate failed, output buffer too small"
+    table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
+    f = io.BytesIO()
+    pq.write_table(table, f, compression='gzip')
+
+    f.seek(0)
+    roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
+    tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas())
+
+
+def test_parquet_file_pass_directory_instead_of_file(tempdir):
+    # ARROW-7208
+    path = tempdir / 'directory'
+    os.mkdir(str(path))
+
+    with pytest.raises(IOError, match="Expected file path"):
+        pq.ParquetFile(path)
+
+
+def test_read_column_invalid_index():
+    table = pa.table([pa.array([4, 5]), pa.array(["foo", "bar"])],
+                     names=['ints', 'strs'])
+    bio = pa.BufferOutputStream()
+    pq.write_table(table, bio)
+    f = pq.ParquetFile(bio.getvalue())
+    assert f.reader.read_column(0).to_pylist() == [4, 5]
+    assert f.reader.read_column(1).to_pylist() == ["foo", "bar"]
+    for index in (-1, 2):
+        with pytest.raises((ValueError, IndexError)):
+            f.reader.read_column(index)
+
+
+@parametrize_legacy_dataset
+def test_parquet_file_too_small(tempdir, use_legacy_dataset):
+    path = str(tempdir / "test.parquet")
+    # TODO(dataset) with datasets API it raises OSError instead
+    with pytest.raises((pa.ArrowInvalid, OSError),
+                       match='size is 0 bytes'):
+        with open(path, 'wb') as f:
+            pass
+        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+
+    with pytest.raises((pa.ArrowInvalid, OSError),
+                       match='size is 4 bytes'):
+        with open(path, 'wb') as f:
+            f.write(b'ffff')
+        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+@pytest.mark.pandas
+def test_filter_before_validate_schema(tempdir, use_legacy_dataset):

Review comment:
       this one can be moved to `test_dataset.py`, I think, since it's a dataset specific feature that is being tested (although it's using the ``read_table`` function in the test, that dispatches to ParquetDataset)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] arw2019 commented on pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
arw2019 commented on pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#issuecomment-740397187


   This is ready for re-review (modulo`pyarrow/tests/test_orc.py` failing on Python / AMD64 MacOS 10.15 Python 3 for a reason I haven't figured out yet)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r536019950



##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet
+
+
+def _write_table(table, path, **kwargs):
+    # So we see the ImportError somewhere
+    import pyarrow.parquet as pq
+    from pyarrow.pandas_compat import _pandas_api
+
+    if _pandas_api.is_data_frame(table):
+        table = pa.Table.from_pandas(table)
+
+    pq.write_table(table, path, **kwargs)
+    return table
+
+
+def _read_table(*args, **kwargs):
+    import pyarrow.parquet as pq
+
+    table = pq.read_table(*args, **kwargs)
+    table.validate(full=True)
+    return table
+
+
+def _roundtrip_table(table, read_table_kwargs=None,
+                     write_table_kwargs=None, use_legacy_dataset=True):
+    read_table_kwargs = read_table_kwargs or {}
+    write_table_kwargs = write_table_kwargs or {}
+
+    writer = pa.BufferOutputStream()
+    _write_table(table, writer, **write_table_kwargs)
+    reader = pa.BufferReader(writer.getvalue())
+    return _read_table(reader, use_legacy_dataset=use_legacy_dataset,
+                       **read_table_kwargs)
+
+
+def _check_roundtrip(table, expected=None, read_table_kwargs=None,
+                     use_legacy_dataset=True, **write_table_kwargs):
+    if expected is None:
+        expected = table
+
+    read_table_kwargs = read_table_kwargs or {}
+
+    # intentionally check twice
+    result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+    result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+
+
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
+    table = pa.Table.from_pandas(df)
+    result = _roundtrip_table(
+        table, write_table_kwargs=write_kwargs,
+        use_legacy_dataset=use_legacy_dataset)
+    return result.to_pandas()
+
+
+def _test_read_common_metadata_files(fs, base_path):
+    import pandas as pd
+
+    import pyarrow.parquet as pq
+
+    N = 100
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+
+    base_path = str(base_path)
+    data_path = os.path.join(base_path, 'data.parquet')
+
+    table = pa.Table.from_pandas(df)
+
+    with fs.open(data_path, 'wb') as f:
+        _write_table(table, f)
+
+    metadata_path = os.path.join(base_path, '_common_metadata')
+    with fs.open(metadata_path, 'wb') as f:
+        pq.write_metadata(table.schema, f)
+
+    dataset = pq.ParquetDataset(base_path, filesystem=fs)
+    assert dataset.common_metadata_path == str(metadata_path)
+
+    with fs.open(data_path) as f:
+        common_schema = pq.read_metadata(f).schema
+    assert dataset.schema.equals(common_schema)
+
+    # handle list of one directory
+    dataset2 = pq.ParquetDataset([base_path], filesystem=fs)
+    assert dataset2.schema.equals(dataset.schema)
+
+
+def _random_integers(size, dtype):
+    # We do not generate integers outside the int64 range
+    platform_int_info = np.iinfo('int_')
+    iinfo = np.iinfo(dtype)
+    return np.random.randint(max(iinfo.min, platform_int_info.min),
+                             min(iinfo.max, platform_int_info.max),
+                             size=size).astype(dtype)
+
+
+def _test_dataframe(size=10000, seed=0):
+    import pandas as pd
+
+    np.random.seed(seed)
+    df = pd.DataFrame({
+        'uint8': _random_integers(size, np.uint8),
+        'uint16': _random_integers(size, np.uint16),
+        'uint32': _random_integers(size, np.uint32),
+        'uint64': _random_integers(size, np.uint64),
+        'int8': _random_integers(size, np.int8),
+        'int16': _random_integers(size, np.int16),
+        'int32': _random_integers(size, np.int32),
+        'int64': _random_integers(size, np.int64),
+        'float32': np.random.randn(size).astype(np.float32),
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0,
+        'strings': [util.rands(10) for i in range(size)],
+        'all_none': [None] * size,
+        'all_none_category': [None] * size
+    })
+
+    # TODO(PARQUET-1015)
+    # df['all_none_category'] = df['all_none_category'].astype('category')
+    return df
+
+
+def _test_write_to_dataset_with_partitions(base_path,

Review comment:
       Same for eg this one

##########
File path: python/pyarrow/tests/parquet/test_multifile_ds.py
##########
@@ -0,0 +1,1662 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       Maybe we can call this one `test_dataset.py` ?

##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet
+
+
+def _write_table(table, path, **kwargs):
+    # So we see the ImportError somewhere
+    import pyarrow.parquet as pq
+    from pyarrow.pandas_compat import _pandas_api
+
+    if _pandas_api.is_data_frame(table):
+        table = pa.Table.from_pandas(table)
+
+    pq.write_table(table, path, **kwargs)
+    return table
+
+
+def _read_table(*args, **kwargs):
+    import pyarrow.parquet as pq
+
+    table = pq.read_table(*args, **kwargs)
+    table.validate(full=True)
+    return table
+
+
+def _roundtrip_table(table, read_table_kwargs=None,
+                     write_table_kwargs=None, use_legacy_dataset=True):
+    read_table_kwargs = read_table_kwargs or {}
+    write_table_kwargs = write_table_kwargs or {}
+
+    writer = pa.BufferOutputStream()
+    _write_table(table, writer, **write_table_kwargs)
+    reader = pa.BufferReader(writer.getvalue())
+    return _read_table(reader, use_legacy_dataset=use_legacy_dataset,
+                       **read_table_kwargs)
+
+
+def _check_roundtrip(table, expected=None, read_table_kwargs=None,
+                     use_legacy_dataset=True, **write_table_kwargs):
+    if expected is None:
+        expected = table
+
+    read_table_kwargs = read_table_kwargs or {}
+
+    # intentionally check twice
+    result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+    result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+
+
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
+    table = pa.Table.from_pandas(df)
+    result = _roundtrip_table(
+        table, write_table_kwargs=write_kwargs,
+        use_legacy_dataset=use_legacy_dataset)
+    return result.to_pandas()
+
+
+def _test_read_common_metadata_files(fs, base_path):

Review comment:
       If a function is only used in a single file (like this one, I think), I would personally keep the definition closer to the tests instead of putting it here

##########
File path: python/pyarrow/tests/parquet/test_multifile_ds.py
##########
@@ -0,0 +1,1662 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import os
+from distutils.version import LooseVersion
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow import fs
+from pyarrow.filesystem import FileSystem, LocalFileSystem
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (
+    parametrize_legacy_dataset,
+    parametrize_legacy_dataset_fixed, parametrize_legacy_dataset_not_supported)
+from pyarrow.util import guid
+
+try:
+    import pyarrow.parquet as pq
+    from pyarrow.tests.parquet.common import (
+        _read_table, _test_dataframe, _test_read_common_metadata_files,
+        _test_write_to_dataset_no_partitions,
+        _test_write_to_dataset_with_partitions, _write_table)
+except ImportError:
+    pq = None
+
+
+try:
+    import pandas as pd
+    import pandas.testing as tm
+
+except ImportError:
+    pd = tm = None
+
+
+@pytest.mark.pandas
+def test_parquet_piece_read(tempdir):
+    df = _test_dataframe(1000)
+    table = pa.Table.from_pandas(df)
+
+    path = tempdir / 'parquet_piece_read.parquet'
+    _write_table(table, path, version='2.0')
+
+    piece1 = pq.ParquetDatasetPiece(path)
+
+    result = piece1.read()
+    assert result.equals(table)
+
+
+@pytest.mark.pandas
+def test_parquet_piece_open_and_get_metadata(tempdir):
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df)
+
+    path = tempdir / 'parquet_piece_read.parquet'
+    _write_table(table, path, version='2.0')
+
+    piece = pq.ParquetDatasetPiece(path)
+    table1 = piece.read()
+    assert isinstance(table1, pa.Table)
+    meta1 = piece.get_metadata()
+    assert isinstance(meta1, pq.FileMetaData)
+
+    assert table.equals(table1)
+
+
+def test_parquet_piece_basics():
+    path = '/baz.parq'
+
+    piece1 = pq.ParquetDatasetPiece(path)
+    piece2 = pq.ParquetDatasetPiece(path, row_group=1)
+    piece3 = pq.ParquetDatasetPiece(
+        path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)])
+
+    assert str(piece1) == path
+    assert str(piece2) == '/baz.parq | row_group=1'
+    assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1'
+
+    assert piece1 == piece1
+    assert piece2 == piece2
+    assert piece3 == piece3
+    assert piece1 != piece3
+
+
+def test_partition_set_dictionary_type():
+    set1 = pq.PartitionSet('key1', ['foo', 'bar', 'baz'])
+    set2 = pq.PartitionSet('key2', [2007, 2008, 2009])
+
+    assert isinstance(set1.dictionary, pa.StringArray)
+    assert isinstance(set2.dictionary, pa.IntegerArray)
+
+    set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)])
+    with pytest.raises(TypeError):
+        set3.dictionary
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_partitioned_directory(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+def test_create_parquet_dataset_multi_threaded(tempdir):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    _partition_test_for_filesystem(fs, base_path)
+
+    manifest = pq.ParquetManifest(base_path, filesystem=fs,
+                                  metadata_nthreads=1)
+    dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16)
+    assert len(dataset.pieces) > 0
+    partitions = dataset.partitions
+    assert len(partitions.partition_names) > 0
+    assert partitions.partition_names == manifest.partitions.partition_names
+    assert len(partitions.levels) == len(manifest.partitions.levels)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_partitioned_columns_selection(tempdir, use_legacy_dataset):
+    # ARROW-3861 - do not include partition columns in resulting table when
+    # `columns` keyword was passed without those columns
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+    _partition_test_for_filesystem(fs, base_path)
+
+    dataset = pq.ParquetDataset(
+        base_path, use_legacy_dataset=use_legacy_dataset)
+    result = dataset.read(columns=["values"])
+    if use_legacy_dataset:
+        # ParquetDataset implementation always includes the partition columns
+        # automatically, and we can't easily "fix" this since dask relies on
+        # this behaviour (ARROW-8644)
+        assert result.column_names == ["values", "foo", "bar"]
+    else:
+        assert result.column_names == ["values"]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_equivalency(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1]
+    string_keys = ['a', 'b', 'c']
+    boolean_keys = [True, False]
+    partition_spec = [
+        ['integer', integer_keys],
+        ['string', string_keys],
+        ['boolean', boolean_keys]
+    ]
+
+    df = pd.DataFrame({
+        'integer': np.array(integer_keys, dtype='i4').repeat(15),
+        'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
+        'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
+                           3),
+    }, columns=['integer', 'string', 'boolean'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    # Old filters syntax:
+    #  integer == 1 AND string != b AND boolean == True
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[('integer', '=', 1), ('string', '!=', 'b'),
+                 ('boolean', '==', True)],
+        use_legacy_dataset=use_legacy_dataset,
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas().reset_index(drop=True))
+
+    assert 0 not in result_df['integer'].values
+    assert 'b' not in result_df['string'].values
+    assert False not in result_df['boolean'].values
+
+    # filters in disjunctive normal form:
+    #  (integer == 1 AND string != b AND boolean == True) OR
+    #  (integer == 2 AND boolean == False)
+    # TODO(ARROW-3388): boolean columns are reconstructed as string
+    filters = [
+        [
+            ('integer', '=', 1),
+            ('string', '!=', 'b'),
+            ('boolean', '==', 'True')
+        ],
+        [('integer', '=', 0), ('boolean', '==', 'False')]
+    ]
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs, filters=filters,
+        use_legacy_dataset=use_legacy_dataset)
+    table = dataset.read()
+    result_df = table.to_pandas().reset_index(drop=True)
+
+    # Check that all rows in the DF fulfill the filter
+    # Pandas 0.23.x has problems with indexing constant memoryviews in
+    # categoricals. Thus we need to make an explicit copy here with np.array.
+    df_filter_1 = (np.array(result_df['integer']) == 1) \
+        & (np.array(result_df['string']) != 'b') \
+        & (np.array(result_df['boolean']) == 'True')
+    df_filter_2 = (np.array(result_df['integer']) == 0) \
+        & (np.array(result_df['boolean']) == 'False')
+    assert df_filter_1.sum() > 0
+    assert df_filter_2.sum() > 0
+    assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())
+
+    if use_legacy_dataset:
+        # Check for \0 in predicate values. Until they are correctly
+        # implemented in ARROW-3391, they would otherwise lead to weird
+        # results with the current code.
+        with pytest.raises(NotImplementedError):
+            filters = [[('string', '==', b'1\0a')]]
+            pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+        with pytest.raises(NotImplementedError):
+            filters = [[('string', '==', '1\0a')]]
+            pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+    else:
+        for filters in [[[('string', '==', b'1\0a')]],
+                        [[('string', '==', '1\0a')]]]:
+            dataset = pq.ParquetDataset(
+                base_path, filesystem=fs, filters=filters,
+                use_legacy_dataset=False)
+            assert dataset.read().num_rows == 0
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[
+            ('integers', '<', 4),
+            ('integers', '>', 1),
+        ],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                      .sort_values(by='index')
+                      .reset_index(drop=True))
+
+    result_list = [x for x in map(int, result_df['integers'].values)]
+    assert result_list == [2, 3]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.xfail(
+    # different error with use_legacy_datasets because result_df is no longer
+    # categorical
+    raises=(TypeError, AssertionError),
+    reason='Loss of type information in creation of categoricals.'
+)
+def test_filters_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    date_keys = [
+        datetime.date(2018, 4, 9),
+        datetime.date(2018, 4, 10),
+        datetime.date(2018, 4, 11),
+        datetime.date(2018, 4, 12),
+        datetime.date(2018, 4, 13)
+    ]
+    partition_spec = [
+        ['dates', date_keys]
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'dates': np.array(date_keys, dtype='datetime64'),
+    }, columns=['index', 'dates'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[
+            ('dates', '<', "2018-04-12"),
+            ('dates', '>', "2018-04-10")
+        ],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                      .sort_values(by='index')
+                      .reset_index(drop=True))
+
+    expected = pd.Categorical(
+        np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
+        categories=np.array(date_keys, dtype='datetime64'))
+
+    assert result_df['dates'].values == expected
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_inclusive_integer(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[
+            ('integers', '<=', 3),
+            ('integers', '>=', 2),
+        ],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                 .sort_values(by='index')
+                 .reset_index(drop=True))
+
+    result_list = [int(x) for x in map(int, result_df['integers'].values)]
+    assert result_list == [2, 3]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_inclusive_set(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1]
+    string_keys = ['a', 'b', 'c']
+    boolean_keys = [True, False]
+    partition_spec = [
+        ['integer', integer_keys],
+        ['string', string_keys],
+        ['boolean', boolean_keys]
+    ]
+
+    df = pd.DataFrame({
+        'integer': np.array(integer_keys, dtype='i4').repeat(15),
+        'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
+        'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
+                           3),
+    }, columns=['integer', 'string', 'boolean'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[('integer', 'in', {1}), ('string', 'in', {'a', 'b'}),
+                 ('boolean', 'in', {True})],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas().reset_index(drop=True))
+
+    assert 0 not in result_df['integer'].values
+    assert 'c' not in result_df['string'].values
+    assert False not in result_df['boolean'].values
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_invalid_pred_op(tempdir, use_legacy_dataset):
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    with pytest.raises(ValueError):
+        pq.ParquetDataset(base_path,
+                          filesystem=fs,
+                          filters=[('integers', '=<', 3), ],
+                          use_legacy_dataset=use_legacy_dataset)
+
+    if use_legacy_dataset:
+        with pytest.raises(ValueError):
+            pq.ParquetDataset(base_path,
+                              filesystem=fs,
+                              filters=[('integers', 'in', set()), ],
+                              use_legacy_dataset=use_legacy_dataset)
+    else:
+        # Dataset API returns empty table instead
+        dataset = pq.ParquetDataset(base_path,
+                                    filesystem=fs,
+                                    filters=[('integers', 'in', set()), ],
+                                    use_legacy_dataset=use_legacy_dataset)
+        assert dataset.read().num_rows == 0
+
+    with pytest.raises(ValueError):
+        pq.ParquetDataset(base_path,
+                          filesystem=fs,
+                          filters=[('integers', '!=', {3})],
+                          use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_filters_invalid_column(tempdir, use_legacy_dataset):
+    # ARROW-5572 - raise error on invalid name in filter specification
+    # works with new dataset / xfail with legacy implementation
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [['integers', integer_keys]]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    msg = "Field named 'non_existent_column' not found"
+    with pytest.raises(ValueError, match=msg):
+        pq.ParquetDataset(base_path, filesystem=fs,
+                          filters=[('non_existent_column', '<', 3), ],
+                          use_legacy_dataset=use_legacy_dataset).read()
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_read_table(tempdir, use_legacy_dataset):
+    # test that filters keyword is passed through in read_table
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    table = pq.read_table(
+        base_path, filesystem=fs, filters=[('integers', '<', 3)],
+        use_legacy_dataset=use_legacy_dataset)
+    assert table.num_rows == 3
+
+    table = pq.read_table(
+        base_path, filesystem=fs, filters=[[('integers', '<', 3)]],
+        use_legacy_dataset=use_legacy_dataset)
+    assert table.num_rows == 3
+
+    table = pq.read_pandas(
+        base_path, filters=[('integers', '<', 3)],
+        use_legacy_dataset=use_legacy_dataset)
+    assert table.num_rows == 3
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_partition_keys_with_underscores(tempdir, use_legacy_dataset):
+    # ARROW-5666 - partition field values with underscores preserve underscores
+    # xfail with legacy dataset -> they get interpreted as integers
+    fs = LocalFileSystem._get_instance()
+    base_path = tempdir
+
+    string_keys = ["2019_2", "2019_3"]
+    partition_spec = [
+        ['year_week', string_keys],
+    ]
+    N = 2
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'year_week': np.array(string_keys, dtype='object'),
+    }, columns=['index', 'year_week'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, use_legacy_dataset=use_legacy_dataset)
+    result = dataset.read()
+    assert result.column("year_week").to_pylist() == string_keys
+
+
+@pytest.fixture
+def s3_bucket(request, s3_connection, s3_server):
+    boto3 = pytest.importorskip('boto3')
+    botocore = pytest.importorskip('botocore')
+
+    host, port, access_key, secret_key = s3_connection
+    s3 = boto3.resource(
+        's3',
+        endpoint_url='http://{}:{}'.format(host, port),
+        aws_access_key_id=access_key,
+        aws_secret_access_key=secret_key,
+        config=botocore.client.Config(signature_version='s3v4'),
+        region_name='us-east-1'
+    )
+    bucket = s3.Bucket('test-s3fs')
+    try:
+        bucket.create()
+    except Exception:
+        # we get BucketAlreadyOwnedByYou error with fsspec handler
+        pass
+    return 'test-s3fs'
+
+
+@pytest.fixture
+def s3_example_s3fs(s3_connection, s3_server, s3_bucket):
+    s3fs = pytest.importorskip('s3fs')
+
+    host, port, access_key, secret_key = s3_connection
+    fs = s3fs.S3FileSystem(
+        key=access_key,
+        secret=secret_key,
+        client_kwargs={
+            'endpoint_url': 'http://{}:{}'.format(host, port)
+        }
+    )
+
+    test_path = '{}/{}'.format(s3_bucket, guid())
+
+    fs.mkdir(test_path)
+    yield fs, test_path
+    try:
+        fs.rm(test_path, recursive=True)
+    except FileNotFoundError:
+        pass
+
+
+@parametrize_legacy_dataset
+def test_read_s3fs(s3_example_s3fs, use_legacy_dataset):
+    fs, path = s3_example_s3fs
+    path = path + "/test.parquet"
+    table = pa.table({"a": [1, 2, 3]})
+    _write_table(table, path, filesystem=fs)
+
+    result = _read_table(
+        path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+    )
+    assert result.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_read_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
+    fs, directory = s3_example_s3fs
+    path = directory + "/test.parquet"
+    table = pa.table({"a": [1, 2, 3]})
+    _write_table(table, path, filesystem=fs)
+
+    result = _read_table(
+        directory, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+    )
+    assert result.equals(table)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_read_partitioned_directory_s3fs_wrapper(
+    s3_example_s3fs, use_legacy_dataset
+):
+    import s3fs
+
+    from pyarrow.filesystem import S3FSWrapper
+
+    if s3fs.__version__ >= LooseVersion("0.5"):
+        pytest.skip("S3FSWrapper no longer working for s3fs 0.5+")
+
+    fs, path = s3_example_s3fs
+    with pytest.warns(DeprecationWarning):
+        wrapper = S3FSWrapper(fs)
+    _partition_test_for_filesystem(wrapper, path)
+
+    # Check that we can auto-wrap
+    dataset = pq.ParquetDataset(
+        path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+    )
+    dataset.read()
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
+    fs, path = s3_example_s3fs
+    _partition_test_for_filesystem(
+        fs, path, use_legacy_dataset=use_legacy_dataset
+    )
+
+
+def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True):
+    foo_keys = [0, 1]
+    bar_keys = ['a', 'b', 'c']
+    partition_spec = [
+        ['foo', foo_keys],
+        ['bar', bar_keys]
+    ]
+    N = 30
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'foo': np.array(foo_keys, dtype='i4').repeat(15),
+        'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
+        'values': np.random.randn(N)
+    }, columns=['index', 'foo', 'bar', 'values'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs, use_legacy_dataset=use_legacy_dataset)
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                 .sort_values(by='index')
+                 .reset_index(drop=True))
+
+    expected_df = (df.sort_values(by='index')
+                   .reset_index(drop=True)
+                   .reindex(columns=result_df.columns))
+
+    expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
+    expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
+
+    assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
+
+    tm.assert_frame_equal(result_df, expected_df)
+
+
+def _generate_partition_directories(fs, base_dir, partition_spec, df):
+    # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
+    #                                       ['bar', ['a', 'b', 'c']]
+    # part_table : a pyarrow.Table to write to each partition
+    DEPTH = len(partition_spec)
+
+    pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/"))
+
+    def _visit_level(base_dir, level, part_keys):
+        name, values = partition_spec[level]
+        for value in values:
+            this_part_keys = part_keys + [(name, value)]
+
+            level_dir = pathsep.join([
+                str(base_dir),
+                '{}={}'.format(name, value)
+            ])
+            fs.mkdir(level_dir)
+
+            if level == DEPTH - 1:
+                # Generate example data
+                file_path = pathsep.join([level_dir, guid()])
+                filtered_df = _filter_partition(df, this_part_keys)
+                part_table = pa.Table.from_pandas(filtered_df)
+                with fs.open(file_path, 'wb') as f:
+                    _write_table(part_table, f)
+                assert fs.exists(file_path)
+
+                file_success = pathsep.join([level_dir, '_SUCCESS'])
+                with fs.open(file_success, 'wb') as f:
+                    pass
+            else:
+                _visit_level(level_dir, level + 1, this_part_keys)
+                file_success = pathsep.join([level_dir, '_SUCCESS'])
+                with fs.open(file_success, 'wb') as f:
+                    pass
+
+    _visit_level(base_dir, 0, [])
+
+
+@pytest.mark.pandas
+def test_read_common_metadata_files(tempdir):
+    fs = LocalFileSystem._get_instance()
+    _test_read_common_metadata_files(fs, tempdir)
+
+
+@pytest.mark.pandas
+def test_read_metadata_files(tempdir):
+    fs = LocalFileSystem._get_instance()
+
+    N = 100
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+
+    data_path = tempdir / 'data.parquet'
+
+    table = pa.Table.from_pandas(df)
+
+    with fs.open(data_path, 'wb') as f:
+        _write_table(table, f)
+
+    metadata_path = tempdir / '_metadata'
+    with fs.open(metadata_path, 'wb') as f:
+        pq.write_metadata(table.schema, f)
+
+    dataset = pq.ParquetDataset(tempdir, filesystem=fs)
+    assert dataset.metadata_path == str(metadata_path)
+
+    with fs.open(data_path) as f:
+        metadata_schema = pq.read_metadata(f).schema
+    assert dataset.schema.equals(metadata_schema)
+
+
+@pytest.mark.pandas
+def test_read_schema(tempdir):
+    N = 100
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+
+    data_path = tempdir / 'test.parquet'
+
+    table = pa.Table.from_pandas(df)
+    _write_table(table, data_path)
+
+    read1 = pq.read_schema(data_path)
+    read2 = pq.read_schema(data_path, memory_map=True)
+    assert table.schema.equals(read1)
+    assert table.schema.equals(read2)
+
+    assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas']
+
+
+def _filter_partition(df, part_keys):
+    predicate = np.ones(len(df), dtype=bool)
+
+    to_drop = []
+    for name, value in part_keys:
+        to_drop.append(name)
+
+        # to avoid pandas warning
+        if isinstance(value, (datetime.date, datetime.datetime)):
+            value = pd.Timestamp(value)
+
+        predicate &= df[name] == value
+
+    return df[predicate].drop(to_drop, axis=1)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_multiple_files(tempdir, use_legacy_dataset):
+    nfiles = 10
+    size = 5
+
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    test_data = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(size, seed=i)
+
+        # Hack so that we don't have a dtype cast in v1 files
+        df['uint32'] = df['uint32'].astype(np.int64)
+
+        path = dirpath / '{}.parquet'.format(i)
+
+        table = pa.Table.from_pandas(df)
+        _write_table(table, path)
+
+        test_data.append(table)
+        paths.append(path)
+
+    # Write a _SUCCESS.crc file
+    (dirpath / '_SUCCESS.crc').touch()
+
+    def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
+        dataset = pq.ParquetDataset(
+            paths, use_legacy_dataset=use_legacy_dataset, **kwargs)
+        return dataset.read(columns=columns, use_threads=use_threads)
+
+    result = read_multiple_files(paths)
+    expected = pa.concat_tables(test_data)
+
+    assert result.equals(expected)
+
+    # Read with provided metadata
+    # TODO(dataset) specifying metadata not yet supported
+    metadata = pq.read_metadata(paths[0])
+    if use_legacy_dataset:
+        result2 = read_multiple_files(paths, metadata=metadata)
+        assert result2.equals(expected)
+
+        result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read()
+        assert result3.equals(expected)
+    else:
+        with pytest.raises(ValueError, match="no longer supported"):
+            pq.read_table(paths, metadata=metadata, use_legacy_dataset=False)
+
+    # Read column subset
+    to_read = [0, 2, 6, result.num_columns - 1]
+
+    col_names = [result.field(i).name for i in to_read]
+    out = pq.read_table(
+        dirpath, columns=col_names, use_legacy_dataset=use_legacy_dataset
+    )
+    expected = pa.Table.from_arrays([result.column(i) for i in to_read],
+                                    names=col_names,
+                                    metadata=result.schema.metadata)
+    assert out.equals(expected)
+
+    # Read with multiple threads
+    pq.read_table(
+        dirpath, use_threads=True, use_legacy_dataset=use_legacy_dataset
+    )
+
+    # Test failure modes with non-uniform metadata
+    bad_apple = _test_dataframe(size, seed=i).iloc[:, :4]
+    bad_apple_path = tempdir / '{}.parquet'.format(guid())
+
+    t = pa.Table.from_pandas(bad_apple)
+    _write_table(t, bad_apple_path)
+
+    if not use_legacy_dataset:
+        # TODO(dataset) Dataset API skips bad files
+        return
+
+    bad_meta = pq.read_metadata(bad_apple_path)
+
+    with pytest.raises(ValueError):
+        read_multiple_files(paths + [bad_apple_path])
+
+    with pytest.raises(ValueError):
+        read_multiple_files(paths, metadata=bad_meta)
+
+    mixed_paths = [bad_apple_path, paths[0]]
+
+    with pytest.raises(ValueError):
+        read_multiple_files(mixed_paths, schema=bad_meta.schema)
+
+    with pytest.raises(ValueError):
+        read_multiple_files(mixed_paths)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_read_pandas(tempdir, use_legacy_dataset):
+    nfiles = 5
+    size = 5
+
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    test_data = []
+    frames = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(size, seed=i)
+        df.index = np.arange(i * size, (i + 1) * size)
+        df.index.name = 'index'
+
+        path = dirpath / '{}.parquet'.format(i)
+
+        table = pa.Table.from_pandas(df)
+        _write_table(table, path)
+        test_data.append(table)
+        frames.append(df)
+        paths.append(path)
+
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+    columns = ['uint8', 'strings']
+    result = dataset.read_pandas(columns=columns).to_pandas()
+    expected = pd.concat([x[columns] for x in frames])
+
+    tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_memory_map(tempdir, use_legacy_dataset):
+    # ARROW-2627: Check that we can use ParquetDataset with memory-mapping
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    df = _test_dataframe(10, seed=0)
+    path = dirpath / '{}.parquet'.format(0)
+    table = pa.Table.from_pandas(df)
+    _write_table(table, path, version='2.0')
+
+    dataset = pq.ParquetDataset(
+        dirpath, memory_map=True, use_legacy_dataset=use_legacy_dataset)
+    assert dataset.read().equals(table)
+    if use_legacy_dataset:
+        assert dataset.pieces[0].read().equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    df = _test_dataframe(10, seed=0)
+    path = dirpath / '{}.parquet'.format(0)
+    table = pa.Table.from_pandas(df)
+    _write_table(table, path, version='2.0')
+
+    with pytest.raises(ValueError):
+        pq.ParquetDataset(
+            dirpath, buffer_size=-64,
+            use_legacy_dataset=use_legacy_dataset)
+
+    for buffer_size in [128, 1024]:
+        dataset = pq.ParquetDataset(
+            dirpath, buffer_size=buffer_size,
+            use_legacy_dataset=use_legacy_dataset)
+        assert dataset.read().equals(table)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('preserve_index', [True, False, None])
+def test_dataset_read_pandas_common_metadata(tempdir, preserve_index):
+    # ARROW-1103
+    nfiles = 5
+    size = 5
+
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    test_data = []
+    frames = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(size, seed=i)
+        df.index = pd.Index(np.arange(i * size, (i + 1) * size), name='index')
+
+        path = dirpath / '{}.parquet'.format(i)
+
+        table = pa.Table.from_pandas(df, preserve_index=preserve_index)
+
+        # Obliterate metadata
+        table = table.replace_schema_metadata(None)
+        assert table.schema.metadata is None
+
+        _write_table(table, path)
+        test_data.append(table)
+        frames.append(df)
+        paths.append(path)
+
+    # Write _metadata common file
+    table_for_metadata = pa.Table.from_pandas(
+        df, preserve_index=preserve_index
+    )
+    pq.write_metadata(table_for_metadata.schema, dirpath / '_metadata')
+
+    dataset = pq.ParquetDataset(dirpath)
+    columns = ['uint8', 'strings']
+    result = dataset.read_pandas(columns=columns).to_pandas()
+    expected = pd.concat([x[columns] for x in frames])
+    expected.index.name = (
+        df.index.name if preserve_index is not False else None)
+    tm.assert_frame_equal(result, expected)
+
+
+def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
+    test_data = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(file_nrows, seed=i)
+        path = base_path / '{}.parquet'.format(i)
+
+        test_data.append(_write_table(df, path))
+        paths.append(path)
+    return paths
+
+
+def _assert_dataset_paths(dataset, paths, use_legacy_dataset):
+    if use_legacy_dataset:
+        assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+    else:
+        paths = [str(path.as_posix()) for path in paths]
+        assert set(paths) == set(dataset._dataset.files)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dir_prefix', ['_', '.'])
+def test_ignore_private_directories(tempdir, dir_prefix, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    # private directory
+    (dirpath / '{}staging'.format(dir_prefix)).mkdir()
+
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_dot(tempdir, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    with (dirpath / '.DS_Store').open('wb') as f:
+        f.write(b'gibberish')
+
+    with (dirpath / '.private').open('wb') as f:
+        f.write(b'gibberish')
+
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    with (dirpath / '_committed_123').open('wb') as f:
+        f.write(b'abcd')
+
+    with (dirpath / '_started_321').open('wb') as f:
+        f.write(b'abcd')
+
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dir_prefix', ['_', '.'])
+def test_ignore_no_private_directories_in_base_path(
+    tempdir, dir_prefix, use_legacy_dataset
+):
+    # ARROW-8427 - don't ignore explicitly listed files if parent directory
+    # is a private directory
+    dirpath = tempdir / "{0}data".format(dir_prefix) / guid()
+    dirpath.mkdir(parents=True)
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    dataset = pq.ParquetDataset(paths, use_legacy_dataset=use_legacy_dataset)
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+    # ARROW-9644 - don't ignore full directory with underscore in base path
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_ignore_custom_prefixes(tempdir, use_legacy_dataset):
+    # ARROW-9573 - allow override of default ignore_prefixes
+    part = ["xxx"] * 3 + ["yyy"] * 3
+    table = pa.table([
+        pa.array(range(len(part))),
+        pa.array(part).dictionary_encode(),
+    ], names=['index', '_part'])
+
+    # TODO use_legacy_dataset ARROW-10247
+    pq.write_to_dataset(table, str(tempdir), partition_cols=['_part'])
+
+    private_duplicate = tempdir / '_private_duplicate'
+    private_duplicate.mkdir()
+    pq.write_to_dataset(table, str(private_duplicate),
+                        partition_cols=['_part'])
+
+    read = pq.read_table(
+        tempdir, use_legacy_dataset=use_legacy_dataset,
+        ignore_prefixes=['_private'])
+
+    assert read.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions(tempdir, use_legacy_dataset):
+    _test_write_to_dataset_with_partitions(str(tempdir), use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_schema(
+    tempdir, use_legacy_dataset
+):
+    schema = pa.schema([pa.field('group1', type=pa.string()),
+                        pa.field('group2', type=pa.string()),
+                        pa.field('num', type=pa.int64()),
+                        pa.field('nan', type=pa.int32()),
+                        pa.field('date', type=pa.timestamp(unit='us'))])
+    _test_write_to_dataset_with_partitions(
+        str(tempdir), use_legacy_dataset, schema=schema)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_index_name(
+    tempdir, use_legacy_dataset
+):
+    _test_write_to_dataset_with_partitions(
+        str(tempdir), use_legacy_dataset, index_name='index_name')
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_no_partitions(tempdir, use_legacy_dataset):
+    _test_write_to_dataset_no_partitions(str(tempdir), use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pathlib(tempdir, use_legacy_dataset):
+    _test_write_to_dataset_with_partitions(
+        tempdir / "test1", use_legacy_dataset)
+    _test_write_to_dataset_no_partitions(
+        tempdir / "test2", use_legacy_dataset)
+
+
+# Those tests are failing - see ARROW-10370
+# @pytest.mark.pandas
+# @pytest.mark.s3
+# @parametrize_legacy_dataset
+# def test_write_to_dataset_pathlib_nonlocal(
+#     tempdir, s3_example_s3fs, use_legacy_dataset
+# ):
+#    # pathlib paths are only accepted for local files
+#    fs, _ = s3_example_s3fs
+
+#    with pytest.raises(TypeError, match="path-like objects are only allowed"):
+#         _test_write_to_dataset_with_partitions(
+#             tempdir / "test1", use_legacy_dataset, filesystem=fs)
+
+#    with pytest.raises(TypeError, match="path-like objects are only allowed"):
+#         _test_write_to_dataset_no_partitions(
+#             tempdir / "test2", use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_s3fs(
+    s3_example_s3fs, use_legacy_dataset
+):
+    fs, path = s3_example_s3fs
+
+    _test_write_to_dataset_with_partitions(
+        path, use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_no_partitions_s3fs(
+    s3_example_s3fs, use_legacy_dataset
+):
+    fs, path = s3_example_s3fs
+
+    _test_write_to_dataset_no_partitions(
+        path, use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_not_supported
+def test_write_to_dataset_with_partitions_and_custom_filenames(
+    tempdir, use_legacy_dataset
+):
+    output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
+                              'group2': list('eefeffgeee'),
+                              'num': list(range(10)),
+                              'nan': [np.nan] * 10,
+                              'date': np.arange('2017-01-01', '2017-01-11',
+                                                dtype='datetime64[D]')})
+    partition_by = ['group1', 'group2']
+    output_table = pa.Table.from_pandas(output_df)
+    path = str(tempdir)
+
+    def partition_filename_callback(keys):
+        return "{}-{}.parquet".format(*keys)
+
+    pq.write_to_dataset(output_table, path,
+                        partition_by, partition_filename_callback,
+                        use_legacy_dataset=use_legacy_dataset)
+
+    dataset = pq.ParquetDataset(path)
+
+    # ARROW-3538: Ensure partition filenames match the given pattern
+    # defined in the local function partition_filename_callback
+    expected_basenames = [
+        'a-e.parquet', 'a-f.parquet',
+        'b-e.parquet', 'b-f.parquet',
+        'b-g.parquet', 'c-e.parquet'
+    ]
+    output_basenames = [os.path.basename(p.path) for p in dataset.pieces]
+
+    assert sorted(expected_basenames) == sorted(output_basenames)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pandas_preserve_extensiondtypes(
+    tempdir, use_legacy_dataset
+):
+    # ARROW-8251 - preserve pandas extension dtypes in roundtrip
+    if LooseVersion(pd.__version__) < "1.0.0":
+        pytest.skip("__arrow_array__ added to pandas in 1.0.0")
+
+    df = pd.DataFrame({'part': 'a', "col": [1, 2, 3]})
+    df['col'] = df['col'].astype("Int64")
+    table = pa.table(df)
+
+    pq.write_to_dataset(
+        table, str(tempdir / "case1"), partition_cols=['part'],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    result = pq.read_table(
+        str(tempdir / "case1"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+    pq.write_to_dataset(
+        table, str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+    )
+    result = pq.read_table(
+        str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+    pq.write_table(table, str(tempdir / "data.parquet"))
+    result = pq.read_table(
+        str(tempdir / "data.parquet"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pandas_preserve_index(tempdir, use_legacy_dataset):
+    # ARROW-8251 - preserve pandas index in roundtrip
+
+    df = pd.DataFrame({'part': ['a', 'a', 'b'], "col": [1, 2, 3]})
+    df.index = pd.Index(['a', 'b', 'c'], name="idx")
+    table = pa.table(df)
+    df_cat = df[["col", "part"]].copy()
+    df_cat["part"] = df_cat["part"].astype("category")
+
+    pq.write_to_dataset(
+        table, str(tempdir / "case1"), partition_cols=['part'],
+        use_legacy_dataset=use_legacy_dataset
+    )
+    result = pq.read_table(
+        str(tempdir / "case1"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result, df_cat)
+
+    pq.write_to_dataset(
+        table, str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+    )
+    result = pq.read_table(
+        str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+    pq.write_table(table, str(tempdir / "data.parquet"))
+    result = pq.read_table(
+        str(tempdir / "data.parquet"), use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+
+# TODO(dataset) support pickling
+def _make_dataset_for_pickling(tempdir, N=100):
+    path = tempdir / 'data.parquet'
+    fs = LocalFileSystem._get_instance()
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+    table = pa.Table.from_pandas(df)
+
+    num_groups = 3
+    with pq.ParquetWriter(path, table.schema) as writer:
+        for i in range(num_groups):
+            writer.write_table(table)
+
+    reader = pq.ParquetFile(path)
+    assert reader.metadata.num_row_groups == num_groups
+
+    metadata_path = tempdir / '_metadata'
+    with fs.open(metadata_path, 'wb') as f:
+        pq.write_metadata(table.schema, f)
+
+    dataset = pq.ParquetDataset(tempdir, filesystem=fs)
+    assert dataset.metadata_path == str(metadata_path)
+
+    return dataset
+
+
+def _assert_dataset_is_picklable(dataset, pickler):
+    def is_pickleable(obj):
+        return obj == pickler.loads(pickler.dumps(obj))
+
+    assert is_pickleable(dataset)
+    assert is_pickleable(dataset.metadata)
+    assert is_pickleable(dataset.metadata.schema)
+    assert len(dataset.metadata.schema)
+    for column in dataset.metadata.schema:
+        assert is_pickleable(column)
+
+    for piece in dataset.pieces:
+        assert is_pickleable(piece)
+        metadata = piece.get_metadata()
+        assert metadata.num_row_groups
+        for i in range(metadata.num_row_groups):
+            assert is_pickleable(metadata.row_group(i))
+
+
+@pytest.mark.pandas
+def test_builtin_pickle_dataset(tempdir, datadir):
+    import pickle
+    dataset = _make_dataset_for_pickling(tempdir)
+    _assert_dataset_is_picklable(dataset, pickler=pickle)
+
+
+@pytest.mark.pandas
+def test_cloudpickle_dataset(tempdir, datadir):
+    cp = pytest.importorskip('cloudpickle')
+    dataset = _make_dataset_for_pickling(tempdir)
+    _assert_dataset_is_picklable(dataset, pickler=cp)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize("filesystem", [
+    None,
+    LocalFileSystem._get_instance(),
+    fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):

Review comment:
       This and the tests below are a bunch of ParquetWriter related tests, which are not directly related to multi-file datasets, so can probably be moved elsewhere (either to test_basic, or to separate file)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#issuecomment-736921959


   https://issues.apache.org/jira/browse/ARROW-9027


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] arw2019 commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
arw2019 commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r541158203



##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet
+
+
+def _write_table(table, path, **kwargs):
+    # So we see the ImportError somewhere
+    import pyarrow.parquet as pq
+    from pyarrow.pandas_compat import _pandas_api
+
+    if _pandas_api.is_data_frame(table):
+        table = pa.Table.from_pandas(table)
+
+    pq.write_table(table, path, **kwargs)
+    return table
+
+
+def _read_table(*args, **kwargs):
+    import pyarrow.parquet as pq
+
+    table = pq.read_table(*args, **kwargs)
+    table.validate(full=True)
+    return table
+
+
+def _roundtrip_table(table, read_table_kwargs=None,
+                     write_table_kwargs=None, use_legacy_dataset=True):
+    read_table_kwargs = read_table_kwargs or {}
+    write_table_kwargs = write_table_kwargs or {}
+
+    writer = pa.BufferOutputStream()
+    _write_table(table, writer, **write_table_kwargs)
+    reader = pa.BufferReader(writer.getvalue())
+    return _read_table(reader, use_legacy_dataset=use_legacy_dataset,
+                       **read_table_kwargs)
+
+
+def _check_roundtrip(table, expected=None, read_table_kwargs=None,
+                     use_legacy_dataset=True, **write_table_kwargs):
+    if expected is None:
+        expected = table
+
+    read_table_kwargs = read_table_kwargs or {}
+
+    # intentionally check twice
+    result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+    result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+
+
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
+    table = pa.Table.from_pandas(df)
+    result = _roundtrip_table(
+        table, write_table_kwargs=write_kwargs,
+        use_legacy_dataset=use_legacy_dataset)
+    return result.to_pandas()
+
+
+def _test_read_common_metadata_files(fs, base_path):

Review comment:
       Fair - moved it to `test_dataset.py`

##########
File path: python/pyarrow/tests/parquet/test_metadata.py
##########
@@ -0,0 +1,475 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import io
+from collections import OrderedDict
+
+import numpy as np
+import pyarrow as pa
+import pytest
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file
+
+try:
+    import pyarrow.parquet as pq
+    from pyarrow.tests.parquet.common import _write_table
+except ImportError:
+    pq = None
+
+
+try:
+    import pandas as pd
+    import pandas.testing as tm
+    from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+    pd = tm = None
+
+
+@pytest.mark.pandas
+def test_parquet_metadata_api():
+    df = alltypes_sample(size=10000)
+    df = df.reindex(columns=sorted(df.columns))
+    df.index = np.random.randint(0, 1000000, size=len(df))
+
+    fileh = make_sample_file(df)
+    ncols = len(df.columns)
+
+    # Series of sniff tests
+    meta = fileh.metadata
+    repr(meta)
+    assert meta.num_rows == len(df)
+    assert meta.num_columns == ncols + 1  # +1 for index
+    assert meta.num_row_groups == 1
+    assert meta.format_version == '2.0'
+    assert 'parquet-cpp' in meta.created_by
+    assert isinstance(meta.serialized_size, int)
+    assert isinstance(meta.metadata, dict)
+
+    # Schema
+    schema = fileh.schema
+    assert meta.schema is schema
+    assert len(schema) == ncols + 1  # +1 for index
+    repr(schema)
+
+    col = schema[0]
+    repr(col)
+    assert col.name == df.columns[0]
+    assert col.max_definition_level == 1
+    assert col.max_repetition_level == 0
+    assert col.max_repetition_level == 0
+
+    assert col.physical_type == 'BOOLEAN'
+    assert col.converted_type == 'NONE'
+
+    with pytest.raises(IndexError):
+        schema[ncols + 1]  # +1 for index
+
+    with pytest.raises(IndexError):
+        schema[-1]
+
+    # Row group
+    for rg in range(meta.num_row_groups):
+        rg_meta = meta.row_group(rg)
+        assert isinstance(rg_meta, pq.RowGroupMetaData)
+        repr(rg_meta)
+
+        for col in range(rg_meta.num_columns):
+            col_meta = rg_meta.column(col)
+            assert isinstance(col_meta, pq.ColumnChunkMetaData)
+            repr(col_meta)
+
+    with pytest.raises(IndexError):
+        meta.row_group(-1)
+
+    with pytest.raises(IndexError):
+        meta.row_group(meta.num_row_groups + 1)
+
+    rg_meta = meta.row_group(0)
+    assert rg_meta.num_rows == len(df)
+    assert rg_meta.num_columns == ncols + 1  # +1 for index
+    assert rg_meta.total_byte_size > 0
+
+    with pytest.raises(IndexError):
+        col_meta = rg_meta.column(-1)
+
+    with pytest.raises(IndexError):
+        col_meta = rg_meta.column(ncols + 2)
+
+    col_meta = rg_meta.column(0)
+    assert col_meta.file_offset > 0
+    assert col_meta.file_path == ''  # created from BytesIO
+    assert col_meta.physical_type == 'BOOLEAN'
+    assert col_meta.num_values == 10000
+    assert col_meta.path_in_schema == 'bool'
+    assert col_meta.is_stats_set is True
+    assert isinstance(col_meta.statistics, pq.Statistics)
+    assert col_meta.compression == 'SNAPPY'
+    assert col_meta.encodings == ('PLAIN', 'RLE')
+    assert col_meta.has_dictionary_page is False
+    assert col_meta.dictionary_page_offset is None
+    assert col_meta.data_page_offset > 0
+    assert col_meta.total_compressed_size > 0
+    assert col_meta.total_uncompressed_size > 0
+    with pytest.raises(NotImplementedError):
+        col_meta.has_index_page
+    with pytest.raises(NotImplementedError):
+        col_meta.index_page_offset
+
+
+def test_parquet_metadata_lifetime(tempdir):
+    # ARROW-6642 - ensure that chained access keeps parent objects alive
+    table = pa.table({'a': [1, 2, 3]})
+    pq.write_table(table, tempdir / 'test_metadata_segfault.parquet')
+    dataset = pq.ParquetDataset(tempdir / 'test_metadata_segfault.parquet')
+    dataset.pieces[0].get_metadata().row_group(0).column(0).statistics
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize(
+    (
+        'data',
+        'type',
+        'physical_type',
+        'min_value',
+        'max_value',
+        'null_count',
+        'num_values',
+        'distinct_count'
+    ),
+    [
+        ([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, 0),
+        ([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, 0),
+        ([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, 0),
+        ([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, 0),
+        ([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, 0),
+        ([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, 0),
+        ([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, 0),
+        ([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, 0),
+        (
+            [-1.1, 2.2, 2.3, None, 4.4], pa.float32(),
+            'FLOAT', -1.1, 4.4, 1, 4, 0
+        ),
+        (
+            [-1.1, 2.2, 2.3, None, 4.4], pa.float64(),
+            'DOUBLE', -1.1, 4.4, 1, 4, 0
+        ),
+        (
+            ['', 'b', chr(1000), None, 'aaa'], pa.binary(),
+            'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, 0
+        ),
+        (
+            [True, False, False, True, True], pa.bool_(),
+            'BOOLEAN', False, True, 0, 5, 0
+        ),
+        (
+            [b'\x00', b'b', b'12', None, b'aaa'], pa.binary(),
+            'BYTE_ARRAY', b'\x00', b'b', 1, 4, 0
+        ),
+    ]
+)
+def test_parquet_column_statistics_api(data, type, physical_type, min_value,
+                                       max_value, null_count, num_values,
+                                       distinct_count):
+    df = pd.DataFrame({'data': data})
+    schema = pa.schema([pa.field('data', type)])
+    table = pa.Table.from_pandas(df, schema=schema, safe=False)
+    fileh = make_sample_file(table)
+
+    meta = fileh.metadata
+
+    rg_meta = meta.row_group(0)
+    col_meta = rg_meta.column(0)
+
+    stat = col_meta.statistics
+    assert stat.has_min_max
+    assert _close(type, stat.min, min_value)
+    assert _close(type, stat.max, max_value)
+    assert stat.null_count == null_count
+    assert stat.num_values == num_values
+    # TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount
+    # method, missing distinct_count is represented as zero instead of None
+    assert stat.distinct_count == distinct_count
+    assert stat.physical_type == physical_type
+
+
+# ARROW-6339
+@pytest.mark.pandas
+def test_parquet_raise_on_unset_statistics():
+    df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")})
+    meta = make_sample_file(pa.Table.from_pandas(df)).metadata
+
+    assert not meta.row_group(0).column(0).statistics.has_min_max
+    assert meta.row_group(0).column(0).statistics.max is None
+
+
+def _close(type, left, right):
+    if type == pa.float32():
+        return abs(left - right) < 1E-7
+    elif type == pa.float64():
+        return abs(left - right) < 1E-13
+    else:
+        return left == right
+
+
+def test_statistics_convert_logical_types(tempdir):
+    # ARROW-5166, ARROW-4139
+
+    # (min, max, type)
+    cases = [(10, 11164359321221007157, pa.uint64()),
+             (10, 4294967295, pa.uint32()),
+             ("ähnlich", "öffentlich", pa.utf8()),
+             (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
+              pa.time32('ms')),
+             (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
+              pa.time64('us')),
+             (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
+              datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
+              pa.timestamp('ms')),
+             (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
+              datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
+              pa.timestamp('us'))]
+
+    for i, (min_val, max_val, typ) in enumerate(cases):
+        t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)],
+                                 ['col'])
+        path = str(tempdir / ('example{}.parquet'.format(i)))
+        pq.write_table(t, path, version='2.0')
+        pf = pq.ParquetFile(path)
+        stats = pf.metadata.row_group(0).column(0).statistics
+        assert stats.min == min_val
+        assert stats.max == max_val
+
+
+def test_parquet_write_disable_statistics(tempdir):
+    table = pa.Table.from_pydict(
+        OrderedDict([
+            ('a', pa.array([1, 2, 3])),
+            ('b', pa.array(['a', 'b', 'c']))
+        ])
+    )
+    _write_table(table, tempdir / 'data.parquet')
+    meta = pq.read_metadata(tempdir / 'data.parquet')
+    for col in [0, 1]:
+        cc = meta.row_group(0).column(col)
+        assert cc.is_stats_set is True
+        assert cc.statistics is not None
+
+    _write_table(table, tempdir / 'data2.parquet', write_statistics=False)
+    meta = pq.read_metadata(tempdir / 'data2.parquet')
+    for col in [0, 1]:
+        cc = meta.row_group(0).column(col)
+        assert cc.is_stats_set is False
+        assert cc.statistics is None
+
+    _write_table(table, tempdir / 'data3.parquet', write_statistics=['a'])
+    meta = pq.read_metadata(tempdir / 'data3.parquet')
+    cc_a = meta.row_group(0).column(0)
+    cc_b = meta.row_group(0).column(1)
+    assert cc_a.is_stats_set is True
+    assert cc_b.is_stats_set is False
+    assert cc_a.statistics is not None
+    assert cc_b.statistics is None
+
+
+@pytest.mark.pandas
+def test_pass_separate_metadata():
+    # ARROW-471
+    df = alltypes_sample(size=10000)
+
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, compression='snappy', version='2.0')
+
+    buf.seek(0)
+    metadata = pq.read_metadata(buf)
+
+    buf.seek(0)
+
+    fileh = pq.ParquetFile(buf, metadata=metadata)
+
+    tm.assert_frame_equal(df, fileh.read().to_pandas())
+
+
+def test_field_id_metadata():
+    # ARROW-7080
+    table = pa.table([pa.array([1], type='int32'),
+                      pa.array([[]], type=pa.list_(pa.int32())),
+                      pa.array([b'boo'], type='binary')],
+                     ['f0', 'f1', 'f2'])
+
+    bio = pa.BufferOutputStream()
+    pq.write_table(table, bio)
+    contents = bio.getvalue()
+
+    pf = pq.ParquetFile(pa.BufferReader(contents))
+    schema = pf.schema_arrow
+
+    # Expected Parquet schema for reference
+    #
+    # required group field_id=0 schema {
+    #   optional int32 field_id=1 f0;
+    #   optional group field_id=2 f1 (List) {
+    #     repeated group field_id=3 list {
+    #       optional int32 field_id=4 item;
+    #     }
+    #   }
+    #   optional binary field_id=5 f2;
+    # }
+
+    field_name = b'PARQUET:field_id'
+    assert schema[0].metadata[field_name] == b'1'
+
+    list_field = schema[1]
+    assert list_field.metadata[field_name] == b'2'
+
+    list_item_field = list_field.type.value_field
+    assert list_item_field.metadata[field_name] == b'4'
+
+    assert schema[2].metadata[field_name] == b'5'
+
+
+@pytest.mark.pandas
+def test_read_metadata_files(tempdir):

Review comment:
       moved them both over to `test_dataset.py`

##########
File path: python/pyarrow/tests/parquet/test_basic.py
##########
@@ -0,0 +1,1063 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import decimal
+import io
+import os
+
+import numpy as np
+import pyarrow as pa
+import pytest
+from pyarrow import fs
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table,
+                                          make_sample_file,
+                                          parametrize_legacy_dataset)
+
+try:
+    import pyarrow.parquet as pq
+    from pyarrow.tests.parquet.common import (_read_table, _test_dataframe,
+                                              _write_table)
+except ImportError:
+    pq = None
+
+
+try:
+    import pandas as pd
+    import pandas.testing as tm
+    from pyarrow.tests.pandas_examples import (dataframe_with_arrays,
+                                               dataframe_with_lists)
+    from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+    pd = tm = None
+
+
+def test_large_binary():
+    data = [b'foo', b'bar'] * 50
+    for type in [pa.large_binary(), pa.large_string()]:
+        arr = pa.array(data, type=type)
+        table = pa.Table.from_arrays([arr], names=['strs'])
+        for use_dictionary in [False, True]:
+            _check_roundtrip(table, use_dictionary=use_dictionary)
+
+
+@pytest.mark.large_memory
+def test_large_binary_huge():
+    s = b'xy' * 997
+    data = [s] * ((1 << 33) // len(s))
+    for type in [pa.large_binary(), pa.large_string()]:
+        arr = pa.array(data, type=type)
+        table = pa.Table.from_arrays([arr], names=['strs'])
+        for use_dictionary in [False, True]:
+            _check_roundtrip(table, use_dictionary=use_dictionary)
+        del arr, table
+
+
+@pytest.mark.large_memory
+def test_large_binary_overflow():
+    s = b'x' * (1 << 31)
+    arr = pa.array([s], type=pa.large_binary())
+    table = pa.Table.from_arrays([arr], names=['strs'])
+    for use_dictionary in [False, True]:
+        writer = pa.BufferOutputStream()
+        with pytest.raises(
+                pa.ArrowInvalid,
+                match="Parquet cannot store strings with size 2GB or more"):
+            _write_table(table, writer, use_dictionary=use_dictionary)
+
+
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dtype', [int, float])
+def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset):
+    filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
+    data = [pa.array(list(map(dtype, range(5))))]
+    table = pa.Table.from_arrays(data, names=['a'])
+    _write_table(table, filename)
+    table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
+    for i in range(table.num_columns):
+        col_written = table[i]
+        col_read = table_read[i]
+        assert table.field(i).name == table_read.field(i).name
+        assert col_read.num_chunks == 1
+        data_written = col_written.chunk(0)
+        data_read = col_read.chunk(0)
+        assert data_written.equals(data_read)
+
+
+def test_parquet_invalid_version(tempdir):
+    table = pa.table({'a': [1, 2, 3]})
+    with pytest.raises(ValueError, match="Unsupported Parquet format version"):
+        _write_table(table, tempdir / 'test_version.parquet', version="2.2")
+    with pytest.raises(ValueError, match="Unsupported Parquet data page " +
+                       "version"):
+        _write_table(table, tempdir / 'test_version.parquet',
+                     data_page_version="2.2")
+
+
+@parametrize_legacy_dataset
+def test_set_data_page_size(use_legacy_dataset):
+    arr = pa.array([1, 2, 3] * 100000)
+    t = pa.Table.from_arrays([arr], names=['f0'])
+
+    # 128K, 512K
+    page_sizes = [2 << 16, 2 << 18]
+    for target_page_size in page_sizes:
+        _check_roundtrip(t, data_page_size=target_page_size,
+                         use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_chunked_table_write(use_legacy_dataset):
+    # ARROW-232
+    tables = []
+    batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10))
+    tables.append(pa.Table.from_batches([batch] * 3))
+    df, _ = dataframe_with_lists()
+    batch = pa.RecordBatch.from_pandas(df)
+    tables.append(pa.Table.from_batches([batch] * 3))
+
+    for data_page_version in ['1.0', '2.0']:
+        for use_dictionary in [True, False]:
+            for table in tables:
+                _check_roundtrip(
+                    table, version='2.0',
+                    use_legacy_dataset=use_legacy_dataset,
+                    data_page_version=data_page_version,
+                    use_dictionary=use_dictionary)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_memory_map(tempdir, use_legacy_dataset):
+    df = alltypes_sample(size=10)
+
+    table = pa.Table.from_pandas(df)
+    _check_roundtrip(table, read_table_kwargs={'memory_map': True},
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
+
+    filename = str(tempdir / 'tmp_file')
+    with open(filename, 'wb') as f:
+        _write_table(table, f, version='2.0')
+    table_read = pq.read_pandas(filename, memory_map=True,
+                                use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_enable_buffered_stream(tempdir, use_legacy_dataset):
+    df = alltypes_sample(size=10)
+
+    table = pa.Table.from_pandas(df)
+    _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025},
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
+
+    filename = str(tempdir / 'tmp_file')
+    with open(filename, 'wb') as f:
+        _write_table(table, f, version='2.0')
+    table_read = pq.read_pandas(filename, buffer_size=4096,
+                                use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_special_chars_filename(tempdir, use_legacy_dataset):
+    table = pa.Table.from_arrays([pa.array([42])], ["ints"])
+    filename = "foo # bar"
+    path = tempdir / filename
+    assert not path.exists()
+    _write_table(table, str(path))
+    assert path.exists()
+    table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@pytest.mark.slow
+def test_file_with_over_int16_max_row_groups():
+    # PARQUET-1857: Parquet encryption support introduced a INT16_MAX upper
+    # limit on the number of row groups, but this limit only impacts files with
+    # encrypted row group metadata because of the int16 row group ordinal used
+    # in the Parquet Thrift metadata. Unencrypted files are not impacted, so
+    # this test checks that it works (even if it isn't a good idea)
+    t = pa.table([list(range(40000))], names=['f0'])
+    _check_roundtrip(t, row_group_size=1)
+
+
+@parametrize_legacy_dataset
+def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
+    # Reproduce failure in ARROW-5630
+    typ = pa.list_(pa.field("item", pa.float32(), False))
+    num_rows = 10000
+    t = pa.table([
+        pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] *
+                  (num_rows // 10)), type=typ)
+    ], ['a'])
+    _check_roundtrip(
+        t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multiple_path_types(tempdir, use_legacy_dataset):
+    # Test compatibility with PEP 519 path-like objects
+    path = tempdir / 'zzz.parquet'
+    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+    _write_table(df, path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+    # Test compatibility with plain string paths
+    path = str(tempdir) + 'zzz.parquet'
+    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+    _write_table(df, path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.dataset
+@parametrize_legacy_dataset
+@pytest.mark.parametrize("filesystem", [
+    None, fs.LocalFileSystem(), LocalFileSystem._get_instance()
+])
+def test_relative_paths(tempdir, use_legacy_dataset, filesystem):
+    # reading and writing from relative paths
+    table = pa.table({"a": [1, 2, 3]})
+
+    # reading
+    pq.write_table(table, str(tempdir / "data.parquet"))
+    with util.change_cwd(tempdir):
+        result = pq.read_table("data.parquet", filesystem=filesystem,
+                               use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+    # writing
+    with util.change_cwd(tempdir):
+        pq.write_table(table, "data2.parquet", filesystem=filesystem)
+    result = pq.read_table(tempdir / "data2.parquet")
+    assert result.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_read_non_existing_file(use_legacy_dataset):
+    # ensure we have a proper error message
+    with pytest.raises(FileNotFoundError):
+        pq.read_table('i-am-not-existing.parquet')
+
+
+@parametrize_legacy_dataset
+def test_parquet_read_from_buffer(tempdir, use_legacy_dataset):
+    # reading from a buffer from python's open()
+    table = pa.table({"a": [1, 2, 3]})
+    pq.write_table(table, str(tempdir / "data.parquet"))
+
+    with open(str(tempdir / "data.parquet"), "rb") as f:
+        result = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+    with open(str(tempdir / "data.parquet"), "rb") as f:
+        result = pq.read_table(pa.PythonFile(f),
+                               use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+
+    writer = pq.ParquetWriter(out, arrow_table.schema, version='2.0')
+
+    frames = []
+    for i in range(10):
+        df['unique_id'] = i
+        arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+        writer.write_table(arrow_table)
+
+        frames.append(df.copy())
+
+    writer.close()
+
+    buf = out.getvalue()
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@parametrize_legacy_dataset
+def test_byte_stream_split(use_legacy_dataset):
+    # This is only a smoke test.
+    arr_float = pa.array(list(map(float, range(100))))
+    arr_int = pa.array(list(map(int, range(100))))
+    data_float = [arr_float, arr_float]
+    table = pa.Table.from_arrays(data_float, names=['a', 'b'])
+
+    # Check with byte_stream_split for both columns.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=False, use_byte_stream_split=True)
+
+    # Check with byte_stream_split for column 'b' and dictionary
+    # for column 'a'.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=['a'],
+                     use_byte_stream_split=['b'])
+
+    # Check with a collision for both columns.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=['a', 'b'],
+                     use_byte_stream_split=['a', 'b'])
+
+    # Check with mixed column types.
+    mixed_table = pa.Table.from_arrays([arr_float, arr_int],
+                                       names=['a', 'b'])
+    _check_roundtrip(mixed_table, expected=mixed_table,
+                     use_dictionary=['b'],
+                     use_byte_stream_split=['a'])
+
+    # Try to use the wrong data type with the byte_stream_split encoding.
+    # This should throw an exception.
+    table = pa.Table.from_arrays([arr_int], names=['tmp'])
+    with pytest.raises(IOError):
+        _check_roundtrip(table, expected=table, use_byte_stream_split=True,
+                         use_dictionary=False,
+                         use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+def test_compression_level(use_legacy_dataset):
+    arr = pa.array(list(map(int, range(1000))))
+    data = [arr, arr]
+    table = pa.Table.from_arrays(data, names=['a', 'b'])
+
+    # Check one compression level.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level=1,
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check another one to make sure that compression_level=1 does not
+    # coincide with the default one in Arrow.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level=5,
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that the user can provide a compression per column
+    _check_roundtrip(table, expected=table,
+                     compression={'a': "gzip", 'b': "snappy"},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that the user can provide a compression level per column
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level={'a': 2, 'b': 3},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that specifying a compression level for a codec which does allow
+    # specifying one, results into an error.
+    # Uncompressed, snappy, lz4 and lzo do not support specifying a compression
+    # level.
+    # GZIP (zlib) allows for specifying a compression level but as of up
+    # to version 1.2.11 the valid range is [-1, 9].
+    invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337),
+                            ("None", 444), ("lzo", 14)]
+    buf = io.BytesIO()
+    for (codec, level) in invalid_combinations:
+        with pytest.raises((ValueError, OSError)):
+            _write_table(table, buf, compression=codec,
+                         compression_level=level)
+
+
+@pytest.mark.pandas
+def test_compare_schemas():
+    df = alltypes_sample(size=10000)
+
+    fileh = make_sample_file(df)
+    fileh2 = make_sample_file(df)
+    fileh3 = make_sample_file(df[df.columns[::2]])
+
+    # ParquetSchema
+    assert isinstance(fileh.schema, pq.ParquetSchema)
+    assert fileh.schema.equals(fileh.schema)
+    assert fileh.schema == fileh.schema
+    assert fileh.schema.equals(fileh2.schema)
+    assert fileh.schema == fileh2.schema
+    assert fileh.schema != 'arbitrary object'
+    assert not fileh.schema.equals(fileh3.schema)
+    assert fileh.schema != fileh3.schema
+
+    # ColumnSchema
+    assert isinstance(fileh.schema[0], pq.ColumnSchema)
+    assert fileh.schema[0].equals(fileh.schema[0])
+    assert fileh.schema[0] == fileh.schema[0]
+    assert not fileh.schema[0].equals(fileh.schema[1])
+    assert fileh.schema[0] != fileh.schema[1]
+    assert fileh.schema[0] != 'arbitrary object'
+
+
+def test_validate_schema_write_table(tempdir):
+    # ARROW-2926
+    simple_fields = [
+        pa.field('POS', pa.uint32()),
+        pa.field('desc', pa.string())
+    ]
+
+    simple_schema = pa.schema(simple_fields)
+
+    # simple_table schema does not match simple_schema
+    simple_from_array = [pa.array([1]), pa.array(['bla'])]
+    simple_table = pa.Table.from_arrays(simple_from_array, ['POS', 'desc'])
+
+    path = tempdir / 'simple_validate_schema.parquet'
+
+    with pq.ParquetWriter(path, simple_schema,
+                          version='2.0',
+                          compression='snappy', flavor='spark') as w:
+        with pytest.raises(ValueError):
+            w.write_table(simple_table)
+
+
+@pytest.mark.pandas
+def test_column_of_arrays(tempdir):
+    df, schema = dataframe_with_arrays()
+
+    filename = tempdir / 'pandas_roundtrip.parquet'
+    arrow_table = pa.Table.from_pandas(df, schema=schema)
+    _write_table(arrow_table, filename, version="2.0", coerce_timestamps='ms')
+    table_read = _read_table(filename)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+def test_column_of_lists(tempdir):
+    df, schema = dataframe_with_lists(parquet_compatible=True)
+
+    filename = tempdir / 'pandas_roundtrip.parquet'
+    arrow_table = pa.Table.from_pandas(df, schema=schema)
+    _write_table(arrow_table, filename, version='2.0')
+    table_read = _read_table(filename)
+    df_read = table_read.to_pandas()
+
+    tm.assert_frame_equal(df, df_read)
+
+
+def test_large_list_records():
+    # This was fixed in PARQUET-1100
+
+    list_lengths = np.random.randint(0, 500, size=50)
+    list_lengths[::10] = 0
+
+    list_values = [list(map(int, np.random.randint(0, 100, size=x)))
+                   if i % 8 else None
+                   for i, x in enumerate(list_lengths)]
+
+    a1 = pa.array(list_values)
+
+    table = pa.Table.from_arrays([a1], ['int_lists'])
+    _check_roundtrip(table)
+
+
+def test_sanitized_spark_field_names():
+    a0 = pa.array([0, 1, 2, 3, 4])
+    name = 'prohib; ,\t{}'
+    table = pa.Table.from_arrays([a0], [name])
+
+    result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'})
+
+    expected_name = 'prohib______'
+    assert result.schema[0].name == expected_name
+
+
+def test_fixed_size_binary():
+    t0 = pa.binary(10)
+    data = [b'fooooooooo', None, b'barooooooo', b'quxooooooo']
+    a0 = pa.array(data, type=t0)
+
+    table = pa.Table.from_arrays([a0],
+                                 ['binary[10]'])
+    _check_roundtrip(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multithreaded_read(use_legacy_dataset):
+    df = alltypes_sample(size=10000)
+
+    table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(table, buf, compression='SNAPPY', version='2.0')
+
+    buf.seek(0)
+    table1 = _read_table(
+        buf, use_threads=True, use_legacy_dataset=use_legacy_dataset)
+
+    buf.seek(0)
+    table2 = _read_table(
+        buf, use_threads=False, use_legacy_dataset=use_legacy_dataset)
+
+    assert table1.equals(table2)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_min_chunksize(use_legacy_dataset):
+    data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
+    table = pa.Table.from_pandas(data.reset_index())
+
+    buf = io.BytesIO()
+    _write_table(table, buf, chunk_size=-1)
+
+    buf.seek(0)
+    result = _read_table(buf, use_legacy_dataset=use_legacy_dataset)
+
+    assert result.equals(table)
+
+    with pytest.raises(ValueError):
+        _write_table(table, buf, chunk_size=0)
+
+
+@pytest.mark.pandas
+def test_read_single_row_group():
+    # ARROW-471
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+
+    pf = pq.ParquetFile(buf)
+
+    assert pf.num_row_groups == K
+
+    row_groups = [pf.read_row_group(i) for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    tm.assert_frame_equal(df, result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_single_row_group_with_column_subset():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf)
+
+    cols = list(df.columns[:2])
+    row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+    # ARROW-4267: Selection of duplicate columns still leads to these columns
+    # being read uniquely.
+    row_groups = [pf.read_row_group(i, columns=cols + cols) for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_multiple_row_groups():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+
+    pf = pq.ParquetFile(buf)
+
+    assert pf.num_row_groups == K
+
+    result = pf.read_row_groups(range(K))
+    tm.assert_frame_equal(df, result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_multiple_row_groups_with_column_subset():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf)
+
+    cols = list(df.columns[:2])
+    result = pf.read_row_groups(range(K), columns=cols)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+    # ARROW-4267: Selection of duplicate columns still leads to these columns
+    # being read uniquely.
+    result = pf.read_row_groups(range(K), columns=cols + cols)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_scan_contents():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf)
+
+    assert pf.scan_contents() == 10000
+    assert pf.scan_contents(df.columns[:4]) == 10000
+
+
+@pytest.mark.pandas
+def test_write_error_deletes_incomplete_file(tempdir):
+    # ARROW-1285
+    df = pd.DataFrame({'a': list('abc'),
+                       'b': list(range(1, 4)),
+                       'c': np.arange(3, 6).astype('u1'),
+                       'd': np.arange(4.0, 7.0, dtype='float64'),
+                       'e': [True, False, True],
+                       'f': pd.Categorical(list('abc')),
+                       'g': pd.date_range('20130101', periods=3),
+                       'h': pd.date_range('20130101', periods=3,
+                                          tz='US/Eastern'),
+                       'i': pd.date_range('20130101', periods=3, freq='ns')})
+
+    pdf = pa.Table.from_pandas(df)
+
+    filename = tempdir / 'tmp_file'
+    try:
+        _write_table(pdf, filename)
+    except pa.ArrowException:
+        pass
+
+    assert not filename.exists()
+
+
+@parametrize_legacy_dataset
+def test_read_non_existent_file(tempdir, use_legacy_dataset):
+    path = 'non-existent-file.parquet'
+    try:
+        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+    except Exception as e:
+        assert path in e.args[0]
+
+
+@parametrize_legacy_dataset
+def test_read_table_doesnt_warn(datadir, use_legacy_dataset):
+    with pytest.warns(None) as record:
+        pq.read_table(datadir / 'v0.7.1.parquet',
+                      use_legacy_dataset=use_legacy_dataset)
+
+    assert len(record) == 0
+
+
+@pytest.mark.large_memory
+def test_large_table_int32_overflow():
+    size = np.iinfo('int32').max + 1
+
+    arr = np.ones(size, dtype='uint8')
+
+    parr = pa.array(arr, type=pa.uint8())
+
+    table = pa.Table.from_arrays([parr], names=['one'])
+    f = io.BytesIO()
+    _write_table(table, f)
+
+
+def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs):
+    stream = pa.BufferOutputStream()
+    _write_table(table, stream, **write_kwargs)
+    buf = stream.getvalue()
+    return _read_table(buf, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_byte_array_exactly_2gb(use_legacy_dataset):
+    # Test edge case reported in ARROW-3762
+    val = b'x' * (1 << 10)
+
+    base = pa.array([val] * ((1 << 21) - 1))
+    cases = [
+        [b'x' * 1023],  # 2^31 - 1
+        [b'x' * 1024],  # 2^31
+        [b'x' * 1025]   # 2^31 + 1
+    ]
+    for case in cases:
+        values = pa.chunked_array([base, pa.array(case)])
+        t = pa.table([values], names=['f0'])
+        result = _simple_table_roundtrip(
+            t, use_legacy_dataset=use_legacy_dataset, use_dictionary=False)
+        assert t.equals(result)
+
+
+@pytest.mark.pandas
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_binary_array_overflow_to_chunked(use_legacy_dataset):
+    # ARROW-3762
+
+    # 2^31 + 1 bytes
+    values = [b'x'] + [
+        b'x' * (1 << 20)
+    ] * 2 * (1 << 10)
+    df = pd.DataFrame({'byte_col': values})
+
+    tbl = pa.Table.from_pandas(df, preserve_index=False)
+    read_tbl = _simple_table_roundtrip(
+        tbl, use_legacy_dataset=use_legacy_dataset)
+
+    col0_data = read_tbl[0]
+    assert isinstance(col0_data, pa.ChunkedArray)
+
+    # Split up into 2GB chunks
+    assert col0_data.num_chunks == 2
+
+    assert tbl.equals(read_tbl)
+
+
+@pytest.mark.pandas
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_list_of_binary_large_cell(use_legacy_dataset):
+    # ARROW-4688
+    data = []
+
+    # TODO(wesm): handle chunked children
+    # 2^31 - 1 bytes in a single cell
+    # data.append([b'x' * (1 << 20)] * 2047 + [b'x' * ((1 << 20) - 1)])
+
+    # A little under 2GB in cell each containing approximately 10MB each
+    data.extend([[b'x' * 1000000] * 10] * 214)
+
+    arr = pa.array(data)
+    table = pa.Table.from_arrays([arr], ['chunky_cells'])
+    read_table = _simple_table_roundtrip(
+        table, use_legacy_dataset=use_legacy_dataset)
+    assert table.equals(read_table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_nested_convenience(tempdir, use_legacy_dataset):
+    # ARROW-1684
+    df = pd.DataFrame({
+        'a': [[1, 2, 3], None, [4, 5], []],
+        'b': [[1.], None, None, [6., 7.]],
+    })
+
+    path = str(tempdir / 'nested_convenience.parquet')
+
+    table = pa.Table.from_pandas(df, preserve_index=False)
+    _write_table(table, path)
+
+    read = pq.read_table(
+        path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
+    tm.assert_frame_equal(read.to_pandas(), df[['a']])
+
+    read = pq.read_table(
+        path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset)
+    tm.assert_frame_equal(read.to_pandas(), df)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_decimal_roundtrip(tempdir, use_legacy_dataset):
+    num_values = 10
+
+    columns = {}
+    for precision in range(1, 39):
+        for scale in range(0, precision + 1):
+            with util.random_seed(0):
+                random_decimal_values = [
+                    util.randdecimal(precision, scale)
+                    for _ in range(num_values)
+                ]
+            column_name = ('dec_precision_{:d}_scale_{:d}'
+                           .format(precision, scale))
+            columns[column_name] = random_decimal_values
+
+    expected = pd.DataFrame(columns)
+    filename = tempdir / 'decimals.parquet'
+    string_filename = str(filename)
+    table = pa.Table.from_pandas(expected)
+    _write_table(table, string_filename)
+    result_table = _read_table(
+        string_filename, use_legacy_dataset=use_legacy_dataset)
+    result = result_table.to_pandas()
+    tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@pytest.mark.xfail(
+    raises=pa.ArrowException, reason='Parquet does not support negative scale'
+)
+def test_decimal_roundtrip_negative_scale(tempdir):
+    expected = pd.DataFrame({'decimal_num': [decimal.Decimal('1.23E4')]})
+    filename = tempdir / 'decimals.parquet'
+    string_filename = str(filename)
+    t = pa.Table.from_pandas(expected)
+    _write_table(t, string_filename)
+    result_table = _read_table(string_filename)
+    result = result_table.to_pandas()
+    tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_writer_context_obj(tempdir, use_legacy_dataset):
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+
+    with pq.ParquetWriter(out, arrow_table.schema, version='2.0') as writer:
+
+        frames = []
+        for i in range(10):
+            df['unique_id'] = i
+            arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+            writer.write_table(arrow_table)
+
+            frames.append(df.copy())
+
+    buf = out.getvalue()
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_writer_context_obj_with_exception(
+    tempdir, use_legacy_dataset
+):
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+    error_text = 'Artificial Error'
+
+    try:
+        with pq.ParquetWriter(out,
+                              arrow_table.schema,
+                              version='2.0') as writer:
+
+            frames = []
+            for i in range(10):
+                df['unique_id'] = i
+                arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+                writer.write_table(arrow_table)
+                frames.append(df.copy())
+                if i == 5:
+                    raise ValueError(error_text)
+    except Exception as e:
+        assert str(e) == error_text
+
+    buf = out.getvalue()
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_zlib_compression_bug(use_legacy_dataset):
+    # ARROW-3514: "zlib deflate failed, output buffer too small"
+    table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
+    f = io.BytesIO()
+    pq.write_table(table, f, compression='gzip')
+
+    f.seek(0)
+    roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
+    tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas())
+
+
+def test_parquet_file_pass_directory_instead_of_file(tempdir):
+    # ARROW-7208
+    path = tempdir / 'directory'
+    os.mkdir(str(path))
+
+    with pytest.raises(IOError, match="Expected file path"):
+        pq.ParquetFile(path)
+
+
+def test_read_column_invalid_index():
+    table = pa.table([pa.array([4, 5]), pa.array(["foo", "bar"])],
+                     names=['ints', 'strs'])
+    bio = pa.BufferOutputStream()
+    pq.write_table(table, bio)
+    f = pq.ParquetFile(bio.getvalue())
+    assert f.reader.read_column(0).to_pylist() == [4, 5]
+    assert f.reader.read_column(1).to_pylist() == ["foo", "bar"]
+    for index in (-1, 2):
+        with pytest.raises((ValueError, IndexError)):
+            f.reader.read_column(index)
+
+
+@parametrize_legacy_dataset
+def test_parquet_file_too_small(tempdir, use_legacy_dataset):
+    path = str(tempdir / "test.parquet")
+    # TODO(dataset) with datasets API it raises OSError instead
+    with pytest.raises((pa.ArrowInvalid, OSError),
+                       match='size is 0 bytes'):
+        with open(path, 'wb') as f:
+            pass
+        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+
+    with pytest.raises((pa.ArrowInvalid, OSError),
+                       match='size is 4 bytes'):
+        with open(path, 'wb') as f:
+            f.write(b'ffff')
+        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+@pytest.mark.pandas
+def test_filter_before_validate_schema(tempdir, use_legacy_dataset):

Review comment:
       Moved it to `test_dataset`

##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pyarrow as pa
+import pytest
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet

Review comment:
       It works for me locally but I have added the mark in each file just in case

##########
File path: python/pyarrow/tests/parquet/test_basic.py
##########
@@ -0,0 +1,1063 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import decimal
+import io
+import os
+
+import numpy as np
+import pyarrow as pa
+import pytest
+from pyarrow import fs
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table,
+                                          make_sample_file,
+                                          parametrize_legacy_dataset)
+
+try:
+    import pyarrow.parquet as pq
+    from pyarrow.tests.parquet.common import (_read_table, _test_dataframe,
+                                              _write_table)
+except ImportError:
+    pq = None
+
+
+try:
+    import pandas as pd
+    import pandas.testing as tm
+    from pyarrow.tests.pandas_examples import (dataframe_with_arrays,
+                                               dataframe_with_lists)
+    from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+    pd = tm = None
+
+
+def test_large_binary():
+    data = [b'foo', b'bar'] * 50
+    for type in [pa.large_binary(), pa.large_string()]:
+        arr = pa.array(data, type=type)
+        table = pa.Table.from_arrays([arr], names=['strs'])
+        for use_dictionary in [False, True]:
+            _check_roundtrip(table, use_dictionary=use_dictionary)
+
+
+@pytest.mark.large_memory
+def test_large_binary_huge():
+    s = b'xy' * 997
+    data = [s] * ((1 << 33) // len(s))
+    for type in [pa.large_binary(), pa.large_string()]:
+        arr = pa.array(data, type=type)
+        table = pa.Table.from_arrays([arr], names=['strs'])
+        for use_dictionary in [False, True]:
+            _check_roundtrip(table, use_dictionary=use_dictionary)
+        del arr, table
+
+
+@pytest.mark.large_memory
+def test_large_binary_overflow():
+    s = b'x' * (1 << 31)
+    arr = pa.array([s], type=pa.large_binary())
+    table = pa.Table.from_arrays([arr], names=['strs'])
+    for use_dictionary in [False, True]:
+        writer = pa.BufferOutputStream()
+        with pytest.raises(
+                pa.ArrowInvalid,
+                match="Parquet cannot store strings with size 2GB or more"):
+            _write_table(table, writer, use_dictionary=use_dictionary)
+
+
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dtype', [int, float])
+def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset):
+    filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
+    data = [pa.array(list(map(dtype, range(5))))]
+    table = pa.Table.from_arrays(data, names=['a'])
+    _write_table(table, filename)
+    table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
+    for i in range(table.num_columns):
+        col_written = table[i]
+        col_read = table_read[i]
+        assert table.field(i).name == table_read.field(i).name
+        assert col_read.num_chunks == 1
+        data_written = col_written.chunk(0)
+        data_read = col_read.chunk(0)
+        assert data_written.equals(data_read)
+
+
+def test_parquet_invalid_version(tempdir):
+    table = pa.table({'a': [1, 2, 3]})
+    with pytest.raises(ValueError, match="Unsupported Parquet format version"):
+        _write_table(table, tempdir / 'test_version.parquet', version="2.2")
+    with pytest.raises(ValueError, match="Unsupported Parquet data page " +
+                       "version"):
+        _write_table(table, tempdir / 'test_version.parquet',
+                     data_page_version="2.2")
+
+
+@parametrize_legacy_dataset
+def test_set_data_page_size(use_legacy_dataset):
+    arr = pa.array([1, 2, 3] * 100000)
+    t = pa.Table.from_arrays([arr], names=['f0'])
+
+    # 128K, 512K
+    page_sizes = [2 << 16, 2 << 18]
+    for target_page_size in page_sizes:
+        _check_roundtrip(t, data_page_size=target_page_size,
+                         use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_chunked_table_write(use_legacy_dataset):
+    # ARROW-232
+    tables = []
+    batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10))
+    tables.append(pa.Table.from_batches([batch] * 3))
+    df, _ = dataframe_with_lists()
+    batch = pa.RecordBatch.from_pandas(df)
+    tables.append(pa.Table.from_batches([batch] * 3))
+
+    for data_page_version in ['1.0', '2.0']:
+        for use_dictionary in [True, False]:
+            for table in tables:
+                _check_roundtrip(
+                    table, version='2.0',
+                    use_legacy_dataset=use_legacy_dataset,
+                    data_page_version=data_page_version,
+                    use_dictionary=use_dictionary)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_memory_map(tempdir, use_legacy_dataset):
+    df = alltypes_sample(size=10)
+
+    table = pa.Table.from_pandas(df)
+    _check_roundtrip(table, read_table_kwargs={'memory_map': True},
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
+
+    filename = str(tempdir / 'tmp_file')
+    with open(filename, 'wb') as f:
+        _write_table(table, f, version='2.0')
+    table_read = pq.read_pandas(filename, memory_map=True,
+                                use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_enable_buffered_stream(tempdir, use_legacy_dataset):
+    df = alltypes_sample(size=10)
+
+    table = pa.Table.from_pandas(df)
+    _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025},
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
+
+    filename = str(tempdir / 'tmp_file')
+    with open(filename, 'wb') as f:
+        _write_table(table, f, version='2.0')
+    table_read = pq.read_pandas(filename, buffer_size=4096,
+                                use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_special_chars_filename(tempdir, use_legacy_dataset):
+    table = pa.Table.from_arrays([pa.array([42])], ["ints"])
+    filename = "foo # bar"
+    path = tempdir / filename
+    assert not path.exists()
+    _write_table(table, str(path))
+    assert path.exists()
+    table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset)
+    assert table_read.equals(table)
+
+
+@pytest.mark.slow
+def test_file_with_over_int16_max_row_groups():
+    # PARQUET-1857: Parquet encryption support introduced a INT16_MAX upper
+    # limit on the number of row groups, but this limit only impacts files with
+    # encrypted row group metadata because of the int16 row group ordinal used
+    # in the Parquet Thrift metadata. Unencrypted files are not impacted, so
+    # this test checks that it works (even if it isn't a good idea)
+    t = pa.table([list(range(40000))], names=['f0'])
+    _check_roundtrip(t, row_group_size=1)
+
+
+@parametrize_legacy_dataset
+def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
+    # Reproduce failure in ARROW-5630
+    typ = pa.list_(pa.field("item", pa.float32(), False))
+    num_rows = 10000
+    t = pa.table([
+        pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] *
+                  (num_rows // 10)), type=typ)
+    ], ['a'])
+    _check_roundtrip(
+        t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multiple_path_types(tempdir, use_legacy_dataset):
+    # Test compatibility with PEP 519 path-like objects
+    path = tempdir / 'zzz.parquet'
+    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+    _write_table(df, path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+    # Test compatibility with plain string paths
+    path = str(tempdir) + 'zzz.parquet'
+    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+    _write_table(df, path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.dataset
+@parametrize_legacy_dataset
+@pytest.mark.parametrize("filesystem", [
+    None, fs.LocalFileSystem(), LocalFileSystem._get_instance()
+])
+def test_relative_paths(tempdir, use_legacy_dataset, filesystem):
+    # reading and writing from relative paths
+    table = pa.table({"a": [1, 2, 3]})
+
+    # reading
+    pq.write_table(table, str(tempdir / "data.parquet"))
+    with util.change_cwd(tempdir):
+        result = pq.read_table("data.parquet", filesystem=filesystem,
+                               use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+    # writing
+    with util.change_cwd(tempdir):
+        pq.write_table(table, "data2.parquet", filesystem=filesystem)
+    result = pq.read_table(tempdir / "data2.parquet")
+    assert result.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_read_non_existing_file(use_legacy_dataset):
+    # ensure we have a proper error message
+    with pytest.raises(FileNotFoundError):
+        pq.read_table('i-am-not-existing.parquet')
+
+
+@parametrize_legacy_dataset
+def test_parquet_read_from_buffer(tempdir, use_legacy_dataset):
+    # reading from a buffer from python's open()
+    table = pa.table({"a": [1, 2, 3]})
+    pq.write_table(table, str(tempdir / "data.parquet"))
+
+    with open(str(tempdir / "data.parquet"), "rb") as f:
+        result = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+    with open(str(tempdir / "data.parquet"), "rb") as f:
+        result = pq.read_table(pa.PythonFile(f),
+                               use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
+    df = _test_dataframe(100)
+    df['unique_id'] = 0
+
+    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+    out = pa.BufferOutputStream()
+
+    writer = pq.ParquetWriter(out, arrow_table.schema, version='2.0')
+
+    frames = []
+    for i in range(10):
+        df['unique_id'] = i
+        arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+        writer.write_table(arrow_table)
+
+        frames.append(df.copy())
+
+    writer.close()
+
+    buf = out.getvalue()
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+    expected = pd.concat(frames, ignore_index=True)
+    tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@parametrize_legacy_dataset
+def test_byte_stream_split(use_legacy_dataset):
+    # This is only a smoke test.
+    arr_float = pa.array(list(map(float, range(100))))
+    arr_int = pa.array(list(map(int, range(100))))
+    data_float = [arr_float, arr_float]
+    table = pa.Table.from_arrays(data_float, names=['a', 'b'])
+
+    # Check with byte_stream_split for both columns.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=False, use_byte_stream_split=True)
+
+    # Check with byte_stream_split for column 'b' and dictionary
+    # for column 'a'.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=['a'],
+                     use_byte_stream_split=['b'])
+
+    # Check with a collision for both columns.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     use_dictionary=['a', 'b'],
+                     use_byte_stream_split=['a', 'b'])
+
+    # Check with mixed column types.
+    mixed_table = pa.Table.from_arrays([arr_float, arr_int],
+                                       names=['a', 'b'])
+    _check_roundtrip(mixed_table, expected=mixed_table,
+                     use_dictionary=['b'],
+                     use_byte_stream_split=['a'])
+
+    # Try to use the wrong data type with the byte_stream_split encoding.
+    # This should throw an exception.
+    table = pa.Table.from_arrays([arr_int], names=['tmp'])
+    with pytest.raises(IOError):
+        _check_roundtrip(table, expected=table, use_byte_stream_split=True,
+                         use_dictionary=False,
+                         use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+def test_compression_level(use_legacy_dataset):
+    arr = pa.array(list(map(int, range(1000))))
+    data = [arr, arr]
+    table = pa.Table.from_arrays(data, names=['a', 'b'])
+
+    # Check one compression level.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level=1,
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check another one to make sure that compression_level=1 does not
+    # coincide with the default one in Arrow.
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level=5,
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that the user can provide a compression per column
+    _check_roundtrip(table, expected=table,
+                     compression={'a': "gzip", 'b': "snappy"},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that the user can provide a compression level per column
+    _check_roundtrip(table, expected=table, compression="gzip",
+                     compression_level={'a': 2, 'b': 3},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check that specifying a compression level for a codec which does allow
+    # specifying one, results into an error.
+    # Uncompressed, snappy, lz4 and lzo do not support specifying a compression
+    # level.
+    # GZIP (zlib) allows for specifying a compression level but as of up
+    # to version 1.2.11 the valid range is [-1, 9].
+    invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337),
+                            ("None", 444), ("lzo", 14)]
+    buf = io.BytesIO()
+    for (codec, level) in invalid_combinations:
+        with pytest.raises((ValueError, OSError)):
+            _write_table(table, buf, compression=codec,
+                         compression_level=level)
+
+
+@pytest.mark.pandas
+def test_compare_schemas():

Review comment:
       Moved it to `test_metadata.py`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] arw2019 commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
arw2019 commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r536199126



##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet
+
+
+def _write_table(table, path, **kwargs):
+    # So we see the ImportError somewhere
+    import pyarrow.parquet as pq
+    from pyarrow.pandas_compat import _pandas_api
+
+    if _pandas_api.is_data_frame(table):
+        table = pa.Table.from_pandas(table)
+
+    pq.write_table(table, path, **kwargs)
+    return table
+
+
+def _read_table(*args, **kwargs):
+    import pyarrow.parquet as pq
+
+    table = pq.read_table(*args, **kwargs)
+    table.validate(full=True)
+    return table
+
+
+def _roundtrip_table(table, read_table_kwargs=None,
+                     write_table_kwargs=None, use_legacy_dataset=True):
+    read_table_kwargs = read_table_kwargs or {}
+    write_table_kwargs = write_table_kwargs or {}
+
+    writer = pa.BufferOutputStream()
+    _write_table(table, writer, **write_table_kwargs)
+    reader = pa.BufferReader(writer.getvalue())
+    return _read_table(reader, use_legacy_dataset=use_legacy_dataset,
+                       **read_table_kwargs)
+
+
+def _check_roundtrip(table, expected=None, read_table_kwargs=None,
+                     use_legacy_dataset=True, **write_table_kwargs):
+    if expected is None:
+        expected = table
+
+    read_table_kwargs = read_table_kwargs or {}
+
+    # intentionally check twice
+    result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+    result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+
+
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
+    table = pa.Table.from_pandas(df)
+    result = _roundtrip_table(
+        table, write_table_kwargs=write_kwargs,
+        use_legacy_dataset=use_legacy_dataset)
+    return result.to_pandas()
+
+
+def _test_read_common_metadata_files(fs, base_path):

Review comment:
       For sure - I put this one here because we reuse it in `test_hdfs.py`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] arw2019 commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
arw2019 commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r536199751



##########
File path: python/pyarrow/tests/parquet/common.py
##########
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+    "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+                           pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet
+
+
+def _write_table(table, path, **kwargs):
+    # So we see the ImportError somewhere
+    import pyarrow.parquet as pq
+    from pyarrow.pandas_compat import _pandas_api
+
+    if _pandas_api.is_data_frame(table):
+        table = pa.Table.from_pandas(table)
+
+    pq.write_table(table, path, **kwargs)
+    return table
+
+
+def _read_table(*args, **kwargs):
+    import pyarrow.parquet as pq
+
+    table = pq.read_table(*args, **kwargs)
+    table.validate(full=True)
+    return table
+
+
+def _roundtrip_table(table, read_table_kwargs=None,
+                     write_table_kwargs=None, use_legacy_dataset=True):
+    read_table_kwargs = read_table_kwargs or {}
+    write_table_kwargs = write_table_kwargs or {}
+
+    writer = pa.BufferOutputStream()
+    _write_table(table, writer, **write_table_kwargs)
+    reader = pa.BufferReader(writer.getvalue())
+    return _read_table(reader, use_legacy_dataset=use_legacy_dataset,
+                       **read_table_kwargs)
+
+
+def _check_roundtrip(table, expected=None, read_table_kwargs=None,
+                     use_legacy_dataset=True, **write_table_kwargs):
+    if expected is None:
+        expected = table
+
+    read_table_kwargs = read_table_kwargs or {}
+
+    # intentionally check twice
+    result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+    result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
+    assert result.equals(expected)
+
+
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
+    table = pa.Table.from_pandas(df)
+    result = _roundtrip_table(
+        table, write_table_kwargs=write_kwargs,
+        use_legacy_dataset=use_legacy_dataset)
+    return result.to_pandas()
+
+
+def _test_read_common_metadata_files(fs, base_path):
+    import pandas as pd
+
+    import pyarrow.parquet as pq
+
+    N = 100
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+
+    base_path = str(base_path)
+    data_path = os.path.join(base_path, 'data.parquet')
+
+    table = pa.Table.from_pandas(df)
+
+    with fs.open(data_path, 'wb') as f:
+        _write_table(table, f)
+
+    metadata_path = os.path.join(base_path, '_common_metadata')
+    with fs.open(metadata_path, 'wb') as f:
+        pq.write_metadata(table.schema, f)
+
+    dataset = pq.ParquetDataset(base_path, filesystem=fs)
+    assert dataset.common_metadata_path == str(metadata_path)
+
+    with fs.open(data_path) as f:
+        common_schema = pq.read_metadata(f).schema
+    assert dataset.schema.equals(common_schema)
+
+    # handle list of one directory
+    dataset2 = pq.ParquetDataset([base_path], filesystem=fs)
+    assert dataset2.schema.equals(dataset.schema)
+
+
+def _random_integers(size, dtype):
+    # We do not generate integers outside the int64 range
+    platform_int_info = np.iinfo('int_')
+    iinfo = np.iinfo(dtype)
+    return np.random.randint(max(iinfo.min, platform_int_info.min),
+                             min(iinfo.max, platform_int_info.max),
+                             size=size).astype(dtype)
+
+
+def _test_dataframe(size=10000, seed=0):
+    import pandas as pd
+
+    np.random.seed(seed)
+    df = pd.DataFrame({
+        'uint8': _random_integers(size, np.uint8),
+        'uint16': _random_integers(size, np.uint16),
+        'uint32': _random_integers(size, np.uint32),
+        'uint64': _random_integers(size, np.uint64),
+        'int8': _random_integers(size, np.int8),
+        'int16': _random_integers(size, np.int16),
+        'int32': _random_integers(size, np.int32),
+        'int64': _random_integers(size, np.int64),
+        'float32': np.random.randn(size).astype(np.float32),
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0,
+        'strings': [util.rands(10) for i in range(size)],
+        'all_none': [None] * size,
+        'all_none_category': [None] * size
+    })
+
+    # TODO(PARQUET-1015)
+    # df['all_none_category'] = df['all_none_category'].astype('category')
+    return df
+
+
+def _test_write_to_dataset_with_partitions(base_path,

Review comment:
       Also reused in `test_hdfs.py`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] arw2019 commented on a change in pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
arw2019 commented on a change in pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#discussion_r536214110



##########
File path: python/pyarrow/tests/parquet/test_multifile_ds.py
##########
@@ -0,0 +1,1662 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] arw2019 commented on pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
arw2019 commented on pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#issuecomment-743312926


   > Sorry, we merged another PR which added a test to `to_parquet.py`: #8861. Can you check this is included correctly?
   
   Yes, on rebasing (it's in `test_metadata.py`)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche closed pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche closed pull request #8816:
URL: https://github.com/apache/arrow/pull/8816


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#issuecomment-742517633


   Sorry, we merged another PR which added a test to `to_parquet.py`: https://github.com/apache/arrow/pull/8861. Can you check this is included correctly?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #8816: ARROW-9027: [Python][Testing] Split parquet tests into multiple files + clean-up

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #8816:
URL: https://github.com/apache/arrow/pull/8816#issuecomment-749003395


   @arw2019 updated this a bit further, and will merge now. Thanks!
   
   With our workflow policy of rebasing / force pushing, it was basically impossible to review your additional changes ... (not your fault to be clear! Just a workflow for which the github interface is not made ..) 
   So I went through the files locally, and made a few additional changes. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org