You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2020/05/25 19:20:56 UTC

[arrow] branch master updated: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6716bbd  ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory
6716bbd is described below

commit 6716bbd25ead03ad4774c8d1caa612a8f66e853c
Author: François Saint-Jacques <fs...@gmail.com>
AuthorDate: Mon May 25 15:20:21 2020 -0400

    ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory
    
    This patch adds the option to create a dataset of parquet files via `ParquetDatasetFactory`. It reads a single  `_metadata` parquet file created by systems like Dask and Spark, extract the metadata of all fragments from said file, and populate each fragment with extra statistics for each columns. The `_metadata` file can be created via `pyarrow.parquet.write_metadata`.
    
    When the Scan operation is materialised, the row groups of the ParquetFileFragment are elided with the statistics _before_ reading the original file metadata. If no RowGroups from a file matches the predicate of the Scan, the file will not be read (including the metadata footer), thus avoiding expensive IO calls. The optimisation benefits are inversely proportional to the predicate's selectivity.
    
    ```python
    # With the plain FileSystemDataset
    %timeit t = nyc_tlc_fs_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...)
    1.55 s ± 26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    # With ParquetDatasetFactory
    %timeit t = nyc_tlc_parquet_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...)
    336 ms ± 17.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    ```
    
    - Implement ParquetDatasetFactory
    - Replace ParquetFileFormat::GetRowGroupFragments with
      ParquetFileFragment::SplitByRowGroup (and the corresponding bindings).
    - Add various optimizations, notably in ColumnChunkStatisticsAsExpression.
    - Consolidate RowGroupSkipper logic in ParquetFileFragment::ScanFile
    - Ensure FileMetaData::AppendRowGroups checks for schema equality.
    - Implement dataset._parquet_dataset
    
    Closes #7180 from fsaintjacques/ARROW-8062-parquet-dataset-metadata
    
    Lead-authored-by: François Saint-Jacques <fs...@gmail.com>
    Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
    Signed-off-by: François Saint-Jacques <fs...@gmail.com>
---
 cpp/examples/arrow/dataset-parquet-scan-example.cc |  60 ++-
 cpp/src/arrow/dataset/file_parquet.cc              | 514 ++++++++++++++-------
 cpp/src/arrow/dataset/file_parquet.h               | 172 ++++++-
 cpp/src/arrow/dataset/file_parquet_test.cc         |  25 +-
 cpp/src/arrow/dataset/filter.h                     |   4 +
 cpp/src/arrow/dataset/scanner.cc                   |  38 +-
 cpp/src/arrow/dataset/test_util.h                  |   4 +-
 cpp/src/parquet/arrow/reader.cc                    |   4 +
 cpp/src/parquet/arrow/reader.h                     |   5 +
 cpp/src/parquet/arrow/reader_internal.cc           |   3 +-
 cpp/src/parquet/metadata.cc                        |   4 +
 cpp/src/parquet/metadata.h                         |  20 +-
 python/pyarrow/_dataset.pyx                        | 123 ++++-
 python/pyarrow/_parquet.pxd                        |   2 +-
 python/pyarrow/dataset.py                          |  48 ++
 python/pyarrow/includes/libarrow_dataset.pxd       |  33 +-
 python/pyarrow/parquet.py                          |  56 ++-
 python/pyarrow/tests/test_dataset.py               | 151 ++++--
 18 files changed, 965 insertions(+), 301 deletions(-)

diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc b/cpp/examples/arrow/dataset-parquet-scan-example.cc
index 16d674b..40e1556 100644
--- a/cpp/examples/arrow/dataset-parquet-scan-example.cc
+++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc
@@ -15,11 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstdlib>
-#include <iostream>
-
 #include <arrow/api.h>
-
 #include <arrow/dataset/dataset.h>
 #include <arrow/dataset/discovery.h>
 #include <arrow/dataset/file_base.h>
@@ -27,6 +23,10 @@
 #include <arrow/dataset/filter.h>
 #include <arrow/dataset/scanner.h>
 #include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/path_util.h>
+
+#include <cstdlib>
+#include <iostream>
 
 using arrow::field;
 using arrow::int16;
@@ -73,12 +73,12 @@ std::shared_ptr<fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
   return fs::FileSystemFromUri(uri, path).ValueOrDie();
 }
 
-std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem> fs,
-                                                std::shared_ptr<ds::FileFormat> format,
-                                                std::string path) {
+std::shared_ptr<ds::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format,
+    std::string dir) {
   // Find all files under `path`
   fs::FileSelector s;
-  s.base_dir = path;
+  s.base_dir = dir;
   s.recursive = true;
 
   ds::FileSystemFactoryOptions options;
@@ -97,6 +97,50 @@ std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem>
   return dataset.ValueOrDie();
 }
 
+std::shared_ptr<ds::Dataset> GetParquetDatasetFromMetadata(
+    std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format,
+    std::string metadata_path) {
+  auto factory = ds::ParquetDatasetFactory::Make(metadata_path, fs, format).ValueOrDie();
+  return factory->Finish().ValueOrDie();
+}
+
+std::shared_ptr<ds::Dataset> GetDatasetFromFile(
+    std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format,
+    std::string file) {
+  ds::FileSystemFactoryOptions options;
+  // The factory will try to build a child dataset.
+  auto factory = ds::FileSystemDatasetFactory::Make(fs, {file}, format, options).ValueOrDie();
+
+  // Try to infer a common schema for all files.
+  auto schema = factory->Inspect(conf.inspect_options).ValueOrDie();
+  // Caller can optionally decide another schema as long as it is compatible
+  // with the previous one, e.g. `factory->Finish(compatible_schema)`.
+  auto child = factory->Finish(conf.finish_options).ValueOrDie();
+
+  ds::DatasetVector children{conf.repeat, child};
+  auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();
+}
+
+std::shared_ptr<ds::Dataset> GetDatasetFromPath(
+    std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format,
+    std::string path) {
+  auto info = fs->GetFileInfo(path).ValueOrDie();
+  if (info.IsDirectory()) {
+    return GetDatasetFromDirectory(fs, format, path);
+  }
+
+  auto dirname_basename = arrow::fs::internal::GetAbstractPathParent(path);
+  auto basename = dirname_basename.second;
+
+  if (basename == "_metadata") {
+    return GetParquetDatasetFromMetadata(fs, format, path);
+  }
+
+  return GetDatasetFromFile(fs, format, path);
+}
+
 std::shared_ptr<ds::Scanner> GetScannerFromDataset(std::shared_ptr<ds::Dataset> dataset,
                                                    std::vector<std::string> columns,
                                                    std::shared_ptr<ds::Expression> filter,
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 0c89578..23f45ae 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -18,6 +18,7 @@
 #include "arrow/dataset/file_parquet.h"
 
 #include <memory>
+#include <unordered_map>
 #include <unordered_set>
 #include <utility>
 #include <vector>
@@ -25,6 +26,7 @@
 #include "arrow/dataset/dataset_internal.h"
 #include "arrow/dataset/filter.h"
 #include "arrow/dataset/scanner.h"
+#include "arrow/filesystem/path_util.h"
 #include "arrow/table.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/range.h"
@@ -44,12 +46,12 @@ using parquet::arrow::StatisticsAsScalars;
 /// \brief A ScanTask backed by a parquet file and a RowGroup within a parquet file.
 class ParquetScanTask : public ScanTask {
  public:
-  ParquetScanTask(int row_group, std::vector<int> column_projection,
+  ParquetScanTask(RowGroupInfo row_group, std::vector<int> column_projection,
                   std::shared_ptr<parquet::arrow::FileReader> reader,
                   std::shared_ptr<ScanOptions> options,
                   std::shared_ptr<ScanContext> context)
       : ScanTask(std::move(options), std::move(context)),
-        row_group_(row_group),
+        row_group_(std::move(row_group)),
         column_projection_(std::move(column_projection)),
         reader_(std::move(reader)) {}
 
@@ -61,13 +63,13 @@ class ParquetScanTask : public ScanTask {
     // Thus the memory incurred by the RecordBatchReader is allocated when
     // Scan is called.
     std::unique_ptr<RecordBatchReader> record_batch_reader;
-    RETURN_NOT_OK(reader_->GetRecordBatchReader({row_group_}, column_projection_,
+    RETURN_NOT_OK(reader_->GetRecordBatchReader({row_group_.id()}, column_projection_,
                                                 &record_batch_reader));
     return IteratorFromReader(std::move(record_batch_reader));
   }
 
  private:
-  int row_group_;
+  RowGroupInfo row_group_;
   std::vector<int> column_projection_;
   // The ScanTask _must_ hold a reference to reader_ because there's no
   // guarantee the producing ParquetScanTaskIterator is still alive. This is a
@@ -102,14 +104,12 @@ static parquet::ReaderProperties MakeReaderProperties(
 }
 
 static parquet::ArrowReaderProperties MakeArrowReaderProperties(
-    const ParquetFileFormat& format, int64_t batch_size,
-    const parquet::ParquetFileReader& reader) {
+    const ParquetFileFormat& format, const parquet::FileMetaData& metadata) {
   parquet::ArrowReaderProperties properties(/* use_threads = */ false);
   for (const std::string& name : format.reader_options.dict_columns) {
-    auto column_index = reader.metadata()->schema()->ColumnIndex(name);
+    auto column_index = metadata.schema()->ColumnIndex(name);
     properties.set_read_dictionary(column_index, true);
   }
-  properties.set_batch_size(batch_size);
   return properties;
 }
 
@@ -136,19 +136,14 @@ static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
   }
 
   auto column_metadata = metadata.ColumnChunk(schema_field.column_index);
-  auto field = schema_field.field;
-  auto field_expr = field_ref(field->name());
-
-  // In case of missing statistics, return nothing.
-  if (!column_metadata->is_stats_set()) {
-    return scalar(true);
-  }
-
   auto statistics = column_metadata->statistics();
   if (statistics == nullptr) {
     return scalar(true);
   }
 
+  const auto& field = schema_field.field;
+  auto field_expr = field_ref(field->name());
+
   // Optimize for corner case where all values are nulls
   if (statistics->num_values() == statistics->null_count()) {
     return equal(field_expr, scalar(MakeNullScalar(field->type())));
@@ -163,126 +158,50 @@ static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
               less_equal(field_expr, scalar(max)));
 }
 
-static Result<std::shared_ptr<Expression>> RowGroupStatisticsAsExpression(
-    const parquet::RowGroupMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
-  ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
-
+static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+    const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) {
+  const auto& fields = manifest.schema_fields;
   ExpressionVector expressions;
-  for (const auto& schema_field : manifest.schema_fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata));
+  expressions.reserve(fields.size());
+  for (const auto& field : fields) {
+    expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, metadata));
   }
 
   return expressions.empty() ? scalar(true) : and_(expressions);
 }
 
-// Skip RowGroups with a filter and metadata
-class RowGroupSkipper {
- public:
-  static constexpr int kIterationDone = -1;
-
-  RowGroupSkipper(std::shared_ptr<parquet::FileMetaData> metadata,
-                  parquet::ArrowReaderProperties arrow_properties,
-                  std::shared_ptr<Expression> filter, std::vector<int> row_groups)
-      : metadata_(std::move(metadata)),
-        arrow_properties_(std::move(arrow_properties)),
-        filter_(std::move(filter)),
-        row_group_idx_(0),
-        row_groups_(std::move(row_groups)),
-        num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups()
-                                            : static_cast<int>(row_groups_.size())) {}
-
-  int Next() {
-    while (row_group_idx_ < num_row_groups_) {
-      const int row_group =
-          row_groups_.empty() ? row_group_idx_++ : row_groups_[row_group_idx_++];
-
-      const auto row_group_metadata = metadata_->RowGroup(row_group);
-
-      const int64_t num_rows = row_group_metadata->num_rows();
-      if (CanSkip(*row_group_metadata)) {
-        rows_skipped_ += num_rows;
-        continue;
-      }
-
-      return row_group;
-    }
-
-    return kIterationDone;
-  }
-
- private:
-  bool CanSkip(const parquet::RowGroupMetaData& metadata) const {
-    auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata, arrow_properties_);
-    // Errors with statistics are ignored and post-filtering will apply.
-    if (!maybe_stats_expr.ok()) {
-      return false;
-    }
-
-    auto stats_expr = maybe_stats_expr.ValueOrDie();
-    return !filter_->Assume(stats_expr)->IsSatisfiable();
-  }
-
-  std::shared_ptr<parquet::FileMetaData> metadata_;
-  parquet::ArrowReaderProperties arrow_properties_;
-  std::shared_ptr<Expression> filter_;
-  int row_group_idx_;
-  std::vector<int> row_groups_;
-  int num_row_groups_;
-  int64_t rows_skipped_;
-};
-
 class ParquetScanTaskIterator {
  public:
   static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
                                        std::shared_ptr<ScanContext> context,
-                                       std::unique_ptr<parquet::ParquetFileReader> reader,
-                                       parquet::ArrowReaderProperties arrow_properties,
-                                       const std::vector<int>& row_groups) {
-    auto metadata = reader->metadata();
-
-    auto column_projection = InferColumnProjection(*metadata, arrow_properties, options);
-
-    std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
-    RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader),
-                                                   arrow_properties, &arrow_reader));
-
-    RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                            options->filter, row_groups);
-
-    return ScanTaskIterator(ParquetScanTaskIterator(
-        std::move(options), std::move(context), std::move(column_projection),
-        std::move(skipper), std::move(arrow_reader)));
+                                       FileSource source,
+                                       std::unique_ptr<parquet::arrow::FileReader> reader,
+                                       std::vector<RowGroupInfo> row_groups) {
+    auto column_projection = InferColumnProjection(*reader, *options);
+    return static_cast<ScanTaskIterator>(ParquetScanTaskIterator(
+        std::move(options), std::move(context), std::move(source), std::move(reader),
+        std::move(column_projection), std::move(row_groups)));
   }
 
   Result<std::shared_ptr<ScanTask>> Next() {
-    auto row_group = skipper_.Next();
-
-    // Iteration is done.
-    if (row_group == RowGroupSkipper::kIterationDone) {
+    if (idx_ >= row_groups_.size()) {
       return nullptr;
     }
 
+    auto row_group = row_groups_[idx_++];
     return std::shared_ptr<ScanTask>(
         new ParquetScanTask(row_group, column_projection_, reader_, options_, context_));
   }
 
  private:
   // Compute the column projection out of an optional arrow::Schema
-  static std::vector<int> InferColumnProjection(
-      const parquet::FileMetaData& metadata,
-      const parquet::ArrowReaderProperties& arrow_properties,
-      const std::shared_ptr<ScanOptions>& options) {
-    auto maybe_manifest = GetSchemaManifest(metadata, arrow_properties);
-    if (!maybe_manifest.ok()) {
-      return internal::Iota(metadata.num_columns());
-    }
-    auto manifest = std::move(maybe_manifest).ValueOrDie();
-
+  static std::vector<int> InferColumnProjection(const parquet::arrow::FileReader& reader,
+                                                const ScanOptions& options) {
+    auto manifest = reader.manifest();
     // Checks if the field is needed in either the projection or the filter.
-    auto fields_name = options->MaterializedFields();
-    std::unordered_set<std::string> materialized_fields{fields_name.cbegin(),
-                                                        fields_name.cend()};
+    auto field_names = options.MaterializedFields();
+    std::unordered_set<std::string> materialized_fields{field_names.cbegin(),
+                                                        field_names.cend()};
     auto should_materialize_column = [&materialized_fields](const std::string& f) {
       return materialized_fields.find(f) != materialized_fields.end();
     };
@@ -315,20 +234,28 @@ class ParquetScanTaskIterator {
   }
 
   ParquetScanTaskIterator(std::shared_ptr<ScanOptions> options,
-                          std::shared_ptr<ScanContext> context,
-                          std::vector<int> column_projection, RowGroupSkipper skipper,
-                          std::unique_ptr<parquet::arrow::FileReader> reader)
+                          std::shared_ptr<ScanContext> context, FileSource source,
+                          std::unique_ptr<parquet::arrow::FileReader> reader,
+                          std::vector<int> column_projection,
+                          std::vector<RowGroupInfo> row_groups)
       : options_(std::move(options)),
         context_(std::move(context)),
+        source_(std::move(source)),
+        reader_(std::move(reader)),
         column_projection_(std::move(column_projection)),
-        skipper_(std::move(skipper)),
-        reader_(std::move(reader)) {}
+        row_groups_(std::move(row_groups)) {}
 
   std::shared_ptr<ScanOptions> options_;
   std::shared_ptr<ScanContext> context_;
-  std::vector<int> column_projection_;
-  RowGroupSkipper skipper_;
+
+  FileSource source_;
   std::shared_ptr<parquet::arrow::FileReader> reader_;
+
+  std::vector<int> column_projection_;
+  std::vector<RowGroupInfo> row_groups_;
+
+  // row group index.
+  size_t idx_ = 0;
 };
 
 ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) {
@@ -359,19 +286,29 @@ Result<bool> ParquetFileFormat::IsSupported(const FileSource& source) const {
 
 Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(
     const FileSource& source) const {
-  auto properties = MakeReaderProperties(*this);
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source));
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  return schema;
+}
+
+Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
+    const FileSource& source, ScanOptions* options, ScanContext* context) const {
+  MemoryPool* pool = context ? context->pool : default_memory_pool();
+  auto properties = MakeReaderProperties(*this, pool);
   ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
 
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
-  RETURN_NOT_OK(parquet::arrow::FileReader::Make(default_memory_pool(), std::move(reader),
-                                                 std::move(arrow_properties),
-                                                 &arrow_reader));
+  auto metadata = reader->metadata();
+  auto arrow_properties = MakeArrowReaderProperties(*this, *metadata);
 
-  std::shared_ptr<Schema> schema;
-  RETURN_NOT_OK(arrow_reader->GetSchema(&schema));
-  return schema;
+  if (options) {
+    arrow_properties.set_batch_size(options->batch_size);
+  }
+
+  std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
+  RETURN_NOT_OK(parquet::arrow::FileReader::Make(
+      pool, std::move(reader), std::move(arrow_properties), &arrow_reader));
+  return std::move(arrow_reader);
 }
 
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
@@ -380,77 +317,318 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline bool RowGroupInfosAreComplete(const std::vector<RowGroupInfo>& infos) {
+  return !infos.empty() &&
+         std::all_of(infos.cbegin(), infos.cend(),
+                     [](const RowGroupInfo& i) { return i.HasStatistics(); });
+}
+
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  auto filter = [&predicate](const RowGroupInfo& info) {
+    return !info.Satisfy(predicate);
+  };
+  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
+  row_groups.erase(end, row_groups.end());
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
+    std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    if (!info.HasStatistics() && info.id() < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+  std::for_each(row_groups.begin(), row_groups.end(), augment);
+
+  return row_groups;
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (row_groups_are_complete) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
+
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  if (!row_groups_are_complete) {
+    ARROW_ASSIGN_OR_RAISE(row_groups,
+                          AugmentRowGroups(std::move(row_groups), reader.get()));
+    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  }
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
+///
+/// RowGroupInfo
+///
 
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
-
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)),
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  std::vector<RowGroupInfo> row_groups;
+  if (HasCompleteMetadata()) {
+    row_groups = FilterRowGroups(row_groups_, *predicate);
+  } else {
+    ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
+    row_groups = FilterRowGroups(std::move(row_groups), *predicate);
+  }
+
+  FragmentVector fragments;
+  fragments.reserve(row_groups.size());
+  for (auto&& row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid(
+          "Extracting file path from RowGroup failed. RowGroup must have a least one "
+          "columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid(
+          "Extracting file path from RowGroup failed. The column chunks "
+          "file path should be set, but got an empty file path.");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Extracting file path from RowGroup failed. Path '",
+                               column_path, "' not equal to path '", path,
+                               ", for ColumnChunk at index ", i,
+                               "; ColumnChunks in a RowGroup must have the same path.");
+      }
+    }
+
+    return fs::internal::JoinAbstractPath(std::vector<std::string>{base_path, path});
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Extracting file path from RowGroup failed. Parquet threw:",
+                           e.what());
+  }
+}
+
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid(
+          "ParquetDatasetFactory must contain a schema with at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> path_to_row_group_infos;
+
+    ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
+
+    for (int i = 0; i < metadata.num_row_groups(); i++) {
+      auto row_group = metadata.RowGroup(i);
+      ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, *row_group));
+      // Normalizing path is required for Windows.
+      ARROW_ASSIGN_OR_RAISE(path, filesystem_->NormalizePath(std::move(path)));
+      auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+      auto num_rows = row_group->num_rows();
+
+      // Insert the path, or increase the count of row groups. It will be
+      // assumed that the RowGroup of a file are ordered exactly like in
+      // the metadata file.
+      auto elem_and_inserted =
+          path_to_row_group_infos.insert({path, {{0, num_rows, stats}}});
+      if (!elem_and_inserted.second) {
+        auto& path_and_count = *elem_and_inserted.first;
+        auto& row_groups = path_and_count.second;
+        auto row_group_id = static_cast<int>(row_groups.size());
+        path_and_count.second.emplace_back(row_group_id, num_rows, stats);
+      }
+    }
+
+    std::vector<std::shared_ptr<FileFragment>> fragments;
+    fragments.reserve(path_to_row_group_infos.size());
+    for (auto&& elem : path_to_row_group_infos) {
+      ARROW_ASSIGN_OR_RAISE(auto fragment,
+                            format_->MakeFragment({std::move(elem.first), filesystem_},
+                                                  scalar(true), std::move(elem.second)));
+      fragments.push_back(std::move(fragment));
+    }
+
+    return fragments;
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file paths from FileMetaData:", e.what());
+  }
+}
+
+Result<std::shared_ptr<Dataset>> ParquetDatasetFactory::Finish(FinishOptions options) {
+  std::shared_ptr<Schema> schema = options.schema;
+  bool schema_missing = schema == nullptr;
+  if (schema_missing) {
+    ARROW_ASSIGN_OR_RAISE(schema, Inspect(options.inspect_options));
+  }
+
+  auto properties = MakeArrowReaderProperties(*format_, *metadata_);
+  ARROW_ASSIGN_OR_RAISE(auto fragments, CollectParquetFragments(*metadata_, properties));
+  return FileSystemDataset::Make(std::move(schema), scalar(true), format_,
+                                 std::move(fragments));
 }
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 574e9b0..6ac8a1b 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -25,6 +25,7 @@
 #include <utility>
 #include <vector>
 
+#include "arrow/dataset/discovery.h"
 #include "arrow/dataset/file_base.h"
 #include "arrow/dataset/type_fwd.h"
 #include "arrow/dataset/visibility.h"
@@ -36,11 +37,16 @@ class FileMetaData;
 class FileDecryptionProperties;
 class ReaderProperties;
 class ArrowReaderProperties;
+namespace arrow {
+class FileReader;
+};  // namespace arrow
 }  // namespace parquet
 
 namespace arrow {
 namespace dataset {
 
+class RowGroupInfo;
+
 /// \brief A FileFormat implementation that reads from Parquet files
 class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
  public:
@@ -97,53 +103,177 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return -1.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// This will return true if the RowGroup was not initialized with statistics
+  /// (rather than silently reading metadata for a complete check).
+  bool Satisfy(const Expression& predicate) const;
+
+  /// \brief Indicate if the other RowGroup points to the same RowGroup.
+  bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
+
+ private:
+  int id_;
+  int64_t num_rows_;
+  std::shared_ptr<Expression> statistics_;
 };
 
+/// \brief A FileFragment with parquet logic.
+///
+/// ParquetFileFragment provides a lazy (with respect to IO) interface to
+/// scan parquet files. Any heavy IO calls are deferred to the Scan() method.
+///
+/// The caller can provide an optional list of selected RowGroups to limit the
+/// number of scanned RowGroups, or to partition the scans across multiple
+/// threads.
+///
+/// It can also attach optional statistics with each RowGroups, providing
+/// pushdown predicate benefits before invoking any heavy IO. This can induce
+/// significant performance boost when scanning high latency file systems.
 class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
  public:
   Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
                                 std::shared_ptr<ScanContext> context) override;
 
-  /// \brief The row groups viewed by this Fragment. This may be empty which signifies all
-  /// row groups are selected.
-  const std::vector<int>& row_groups() const { return row_groups_; }
+  Result<FragmentVector> SplitByRowGroup(const std::shared_ptr<Expression>& predicate);
+
+  /// \brief Return the RowGroups selected by this fragment. An empty list
+  /// represents all RowGroups in the parquet file.
+  const std::vector<RowGroupInfo>& row_groups() const { return row_groups_; }
+
+  /// \brief Indicate if the attached statistics are complete.
+  ///
+  /// The statistics are complete if the provided RowGroups (see `row_groups()`)
+  /// is not empty / and all RowGroup return true on `RowGroup::HasStatistics()`.
+  bool HasCompleteMetadata() const { return has_complete_metadata_; }
 
  private:
   ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                       std::shared_ptr<Expression> partition_expression,
-                      std::vector<int> row_groups)
-      : FileFragment(std::move(source), std::move(format),
-                     std::move(partition_expression)),
-        row_groups_(std::move(row_groups)) {}
+                      std::vector<RowGroupInfo> row_groups);
 
-  const ParquetFileFormat& parquet_format() const;
-
-  std::vector<int> row_groups_;
+  std::vector<RowGroupInfo> row_groups_;
+  ParquetFileFormat& parquet_format_;
+  bool has_complete_metadata_;
 
   friend class ParquetFileFormat;
 };
 
+/// \brief Create FileSystemDataset from custom `_metadata` cache file.
+///
+/// Dask and other systems will generate a cache metadata file by concatenating
+/// the RowGroupMetaData of multiple parquet files into a single parquet file
+/// that only contains metadata and no ColumnChunk data.
+///
+/// ParquetDatasetFactory creates a FileSystemDataset composed of
+/// ParquetFileFragment where each fragment is pre-populated with the exact
+/// number of row groups and statistics for each columns.
+class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
+ public:
+  /// \brief Create a ParquetDatasetFactory from a metadata path.
+  ///
+  /// The `metadata_path` will be read from `filesystem`. Each RowGroup
+  /// contained in the metadata file will be relative to `dirname(metadata_path)`.
+  ///
+  /// \param[in] metadata_path path of the metadata parquet file
+  /// \param[in] filesystem from which to open/read the path
+  /// \param[in] format to read the file with.
+  static Result<std::shared_ptr<DatasetFactory>> Make(
+      const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+      std::shared_ptr<ParquetFileFormat> format);
+
+  /// \brief Create a ParquetDatasetFactory from a metadata source.
+  ///
+  /// Similar to the previous Make definition, but the metadata can be a Buffer
+  /// and the base_path is explicited instead of inferred from the metadata
+  /// path.
+  ///
+  /// \param[in] metadata source to open the metadata parquet file from
+  /// \param[in] base_path used as the prefix of every parquet files referenced
+  /// \param[in] filesystem from which to read the files referenced.
+  /// \param[in] format to read the file with.
+  static Result<std::shared_ptr<DatasetFactory>> Make(
+      const FileSource& metadata, const std::string& base_path,
+      std::shared_ptr<fs::FileSystem> filesystem,
+      std::shared_ptr<ParquetFileFormat> format);
+
+  Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
+      InspectOptions options) override;
+
+  Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
+
+ protected:
+  ParquetDatasetFactory(std::shared_ptr<fs::FileSystem> fs,
+                        std::shared_ptr<ParquetFileFormat> format,
+                        std::shared_ptr<parquet::FileMetaData> metadata,
+                        std::string base_path);
+
+  std::shared_ptr<fs::FileSystem> filesystem_;
+  std::shared_ptr<ParquetFileFormat> format_;
+  std::shared_ptr<parquet::FileMetaData> metadata_;
+  std::string base_path_;
+
+ private:
+  Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
+      const parquet::FileMetaData& metadata,
+      const parquet::ArrowReaderProperties& properties);
+};
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 70b0f02..7cb81c2 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -183,18 +183,16 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin {
                                 std::vector<int> expected_row_groups,
                                 const Expression& filter) {
     auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
-    ASSERT_OK_AND_ASSIGN(auto row_group_fragments,
-                         format_->GetRowGroupFragments(*parquet_fragment, filter.Copy()));
+    ASSERT_OK_AND_ASSIGN(auto fragments, parquet_fragment->SplitByRowGroup(filter.Copy()))
 
-    auto expected_row_group = expected_row_groups.begin();
-    for (auto maybe_fragment : row_group_fragments) {
-      ASSERT_OK_AND_ASSIGN(auto fragment, std::move(maybe_fragment));
-      auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
+    EXPECT_EQ(fragments.size(), expected_row_groups.size());
+    for (size_t i = 0; i < fragments.size(); i++) {
+      auto expected = expected_row_groups[i];
+      auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragments[i]);
 
-      auto i = *expected_row_group++;
-      EXPECT_EQ(parquet_fragment->row_groups(), std::vector<int>{i});
-
-      EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), i + 1);
+      EXPECT_EQ(parquet_fragment->row_groups(),
+                RowGroupInfo::FromIdentifiers({expected}));
+      EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1);
     }
   }
 
@@ -434,7 +432,6 @@ TEST_F(TestParquetFileFormat, PredicatePushdown) {
 
 TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) {
   constexpr int64_t kNumRowGroups = 16;
-  constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
 
   auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
   auto source = GetFileSource(reader.get());
@@ -442,7 +439,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) {
   opts_ = ScanOptions::Make(reader->schema());
   ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
 
-  CountRowGroupsInFragment(fragment, internal::Iota(static_cast<int>(kTotalNumRows)),
+  CountRowGroupsInFragment(fragment, internal::Iota(static_cast<int>(kNumRowGroups)),
                            *scalar(true));
 
   for (int i = 0; i < kNumRowGroups; ++i) {
@@ -466,7 +463,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) {
   CountRowGroupsInFragment(fragment, internal::Iota(5, static_cast<int>(kNumRowGroups)),
                            "i64"_ >= int64_t(6));
 
-  CountRowGroupsInFragment(fragment, {5, 6, 7},
+  CountRowGroupsInFragment(fragment, {5, 6},
                            "i64"_ >= int64_t(6) and "i64"_ < int64_t(8));
 }
 
@@ -492,7 +489,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) {
   // individual selection selects a single row group
   for (int i = 0; i < kNumRowGroups; ++i) {
     CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1);
-    EXPECT_EQ(row_groups_fragment({i})->row_groups(), std::vector<int>{i});
+    EXPECT_EQ(row_groups_fragment({i})->row_groups(), RowGroupInfo::FromIdentifiers({i}));
   }
 
   for (int i = 0; i < kNumRowGroups; ++i) {
diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h
index 436762c..b7d4655 100644
--- a/cpp/src/arrow/dataset/filter.h
+++ b/cpp/src/arrow/dataset/filter.h
@@ -200,6 +200,10 @@ class ARROW_DS_EXPORT Expression {
   ///
   /// This behaves like IsSatisfiable, but it simplifies the current expression
   /// with the given `other` information.
+  bool IsSatisfiableWith(const Expression& other) const {
+    return Assume(other)->IsSatisfiable();
+  }
+
   bool IsSatisfiableWith(const std::shared_ptr<Expression>& other) const {
     return Assume(other)->IsSatisfiable();
   }
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 97b2daf..2207524 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -177,32 +177,38 @@ static inline RecordBatchVector FlattenRecordBatchVector(
   return flattened;
 }
 
+struct TableAssemblyState {
+  /// Protecting mutating accesses to batches
+  std::mutex mutex{};
+  std::vector<RecordBatchVector> batches{};
+
+  void Emplace(RecordBatchVector b, size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= position) {
+      batches.resize(position + 1);
+    }
+    batches[position] = std::move(b);
+  }
+};
+
 Result<std::shared_ptr<Table>> Scanner::ToTable() {
   ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
   auto task_group = scan_context_->TaskGroup();
 
-  // Protecting mutating accesses to batches
-  std::mutex mutex;
-  std::vector<RecordBatchVector> batches;
+  /// Wraps the state in a shared_ptr to ensure that failing ScanTasks don't
+  /// invalidate concurrently running tasks when Finish() early returns
+  /// and the mutex/batches fail out of scope.
+  auto state = std::make_shared<TableAssemblyState>();
+
   size_t scan_task_id = 0;
   for (auto maybe_scan_task : scan_task_it) {
     ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
 
     auto id = scan_task_id++;
-    task_group->Append([&batches, &mutex, id, scan_task] {
+    task_group->Append([state, id, scan_task] {
       ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
-
       ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector());
-
-      {
-        // Move into global batches.
-        std::lock_guard<std::mutex> lock(mutex);
-        if (batches.size() <= id) {
-          batches.resize(id + 1);
-        }
-        batches[id] = std::move(local);
-      }
-
+      state->Emplace(std::move(local), id);
       return Status::OK();
     });
   }
@@ -211,7 +217,7 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
   RETURN_NOT_OK(task_group->Finish());
 
   return Table::FromRecordBatches(scan_options_->schema(),
-                                  FlattenRecordBatchVector(std::move(batches)));
+                                  FlattenRecordBatchVector(std::move(state->batches)));
 }
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 75a4561..cc9ae3a 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -449,8 +449,8 @@ struct ArithmeticDatasetFixture {
 
     std::stringstream ss;
     ss << "[\n";
-    for (int64_t i = 0; i < n; i++) {
-      if (i != 0) {
+    for (int64_t i = 1; i <= n; i++) {
+      if (i != 1) {
         ss << "\n,";
       }
       ss << record;
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 129ee84..2a3cf11 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -280,6 +280,10 @@ class FileReaderImpl : public FileReader {
     reader_properties_.set_use_threads(use_threads);
   }
 
+  const ArrowReaderProperties& properties() const override { return reader_properties_; }
+
+  const SchemaManifest& manifest() const override { return manifest_; }
+
   Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
                       int64_t* num_rows) override {
     BEGIN_PARQUET_CATCH_EXCEPTIONS
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 8320548..858ff4d 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -46,6 +46,7 @@ namespace arrow {
 
 class ColumnChunkReader;
 class ColumnReader;
+struct SchemaManifest;
 class RowGroupReader;
 
 /// \brief Arrow read adapter class for deserializing Parquet files as Arrow row batches.
@@ -211,6 +212,10 @@ class PARQUET_EXPORT FileReader {
   /// By default only one thread is used.
   virtual void set_use_threads(bool use_threads) = 0;
 
+  virtual const ArrowReaderProperties& properties() const = 0;
+
+  virtual const SchemaManifest& manifest() const = 0;
+
   virtual ~FileReader() = default;
 };
 
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index c706d15..65b453c 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -747,8 +747,7 @@ Status TypedIntegralStatisticsAsScalars(const Statistics& statistics,
       using CType = typename StatisticsType::T;
       return MakeMinMaxScalar<CType, StatisticsType>(statistics, min, max);
     default:
-      return Status::NotImplemented("Cannot extract statistics for type ",
-                                    logical_type->ToString());
+      return Status::NotImplemented("Cannot extract statistics for type ");
   }
 
   return Status::OK();
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 8f424aa..9d21c62 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -652,6 +652,10 @@ class FileMetaData::FileMetaDataImpl {
   }
 
   void AppendRowGroups(const std::unique_ptr<FileMetaDataImpl>& other) {
+    if (!schema()->Equals(*other->schema())) {
+      throw ParquetException("AppendRowGroups requires equal schemas.");
+    }
+
     format::RowGroup other_rg;
     for (int i = 0; i < other->num_row_groups(); i++) {
       other_rg = other->row_group(i);
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index f4482ca..8c26461 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -305,10 +305,26 @@ class PARQUET_EXPORT FileMetaData {
 
   const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
 
-  // Set file_path ColumnChunk fields to a particular value
+  /// \brief Set a path to all ColumnChunk for all RowGroups.
+  ///
+  /// Commonly used by systems (Dask, Spark) who generates an metadata-only
+  /// parquet file. The path is usually relative to said index file.
+  ///
+  /// \param[in] path to set.
   void set_file_path(const std::string& path);
 
-  // Merge row-group metadata from "other" FileMetaData object
+  /// \brief Merge row groups from another metadata file into this one.
+  ///
+  /// The schema of the input FileMetaData must be equal to the
+  /// schema of this object.
+  ///
+  /// This is used by systems who creates an aggregate metadata-only file by
+  /// concatenating the row groups of multiple files. This newly created
+  /// metadata file acts as an index of all available row groups.
+  ///
+  /// \param[in] other FileMetaData to merge the row groups from.
+  ///
+  /// \throws ParquetException if schemas are not equal.
   void AppendRowGroups(const FileMetaData& other);
 
  private:
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 2190009..1eba6ab 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -758,6 +758,42 @@ cdef class FileFragment(Fragment):
         return FileFormat.wrap(self.file_fragment.format())
 
 
+cdef class RowGroupInfo:
+    """A wrapper class for RowGroup information"""
+
+    cdef:
+        CRowGroupInfo info
+
+    def __init__(self, int id):
+        cdef CRowGroupInfo info = CRowGroupInfo(id)
+        self.init(info)
+
+    cdef void init(self, CRowGroupInfo info):
+        self.info = info
+
+    @staticmethod
+    cdef wrap(CRowGroupInfo info):
+        cdef RowGroupInfo self = RowGroupInfo.__new__(RowGroupInfo)
+        self.init(info)
+        return self
+
+    @property
+    def id(self):
+        return self.info.id()
+
+    @property
+    def num_rows(self):
+        return self.info.num_rows()
+
+    def __eq__(self, other):
+        if not isinstance(other, RowGroupInfo):
+            return False
+        cdef:
+            RowGroupInfo row_group = other
+            CRowGroupInfo c_info = row_group.info
+        return self.info.Equals(c_info)
+
+
 cdef class ParquetFileFragment(FileFragment):
     """A Fragment representing a parquet file."""
 
@@ -770,32 +806,42 @@ cdef class ParquetFileFragment(FileFragment):
 
     @property
     def row_groups(self):
-        row_groups = set(self.parquet_file_fragment.row_groups())
-        if len(row_groups) != 0:
-            return row_groups
-        return None
+        cdef:
+            vector[CRowGroupInfo] c_row_groups
+        c_row_groups = self.parquet_file_fragment.row_groups()
+        if c_row_groups.empty():
+            return None
+        return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]
 
-    def get_row_group_fragments(self, Expression extra_filter=None):
+    def split_by_row_group(self, Expression predicate=None):
         """
+        Split the fragment into multiple fragments.
+
         Yield a Fragment wrapping each row group in this ParquetFileFragment.
-        Row groups will be excluded whose metadata contradicts the either the
-        filter provided on construction of this Fragment or the extra_filter
-        argument.
+        Row groups will be excluded whose metadata contradicts the optional
+        predicate.
+
+        Parameters
+        ----------
+        predicate : Expression, default None
+            Exclude RowGroups whose statistics contradicts the predicate.
+
+        Returns
+        -------
+        A list of Fragment.
         """
         cdef:
-            CParquetFileFormat* c_format
-            CFragmentIterator c_fragments
-            shared_ptr[CExpression] c_extra_filter
+            vector[shared_ptr[CFragment]] c_fragments
+            shared_ptr[CExpression] c_predicate
+            shared_ptr[CFragment] c_fragment
 
         schema = self.physical_schema
-        c_extra_filter = _insert_implicit_casts(extra_filter, schema)
-        c_format = <CParquetFileFormat*> self.file_fragment.format().get()
-        c_fragments = move(GetResultValue(c_format.GetRowGroupFragments(deref(
-            self.parquet_file_fragment), move(c_extra_filter))))
-
-        for maybe_fragment in c_fragments:
-            yield Fragment.wrap(GetResultValue(move(maybe_fragment)))
+        c_predicate = _insert_implicit_casts(predicate, schema)
+        with nogil:
+            c_fragments = move(GetResultValue(
+                self.parquet_file_fragment.SplitByRowGroup(move(c_predicate))))
 
+        return [Fragment.wrap(c_fragment) for c_fragment in c_fragments]
 
 cdef class ParquetReadOptions:
     """
@@ -1446,6 +1492,47 @@ cdef class UnionDatasetFactory(DatasetFactory):
         self.union_factory = <CUnionDatasetFactory*> sp.get()
 
 
+cdef class ParquetDatasetFactory(DatasetFactory):
+    """
+    Create a ParquetDatasetFactory from a Parquet `_metadata` file.
+
+    Parameters
+    ----------
+    metadata_path : str
+        Path to the `_metadata` parquet metadata-only file generated with
+        `pyarrow.parquet.write_metadata`.
+    filesystem : pyarrow.fs.FileSystem
+        Filesystem to read the metadata_path from, and subsequent parquet
+        files.
+    format : ParquetFileFormat
+        Parquet format options.
+    """
+
+    cdef:
+        CParquetDatasetFactory* parquet_factory
+
+    def __init__(self, metadata_path, FileSystem filesystem not None,
+                 FileFormat format not None):
+        cdef:
+            c_string path
+            shared_ptr[CFileSystem] c_filesystem
+            shared_ptr[CParquetFileFormat] c_format
+            CResult[shared_ptr[CDatasetFactory]] result
+
+        c_path = tobytes(metadata_path)
+        c_filesystem = filesystem.unwrap()
+        c_format = static_pointer_cast[CParquetFileFormat, CFileFormat](
+            format.unwrap())
+
+        result = CParquetDatasetFactory.MakeFromMetaDataPath(
+            c_path, c_filesystem, c_format)
+        self.init(GetResultValue(result))
+
+    cdef init(self, shared_ptr[CDatasetFactory]& sp):
+        DatasetFactory.init(self, sp)
+        self.parquet_factory = <CParquetDatasetFactory*> sp.get()
+
+
 cdef class ScanTask:
     """Read record batches from a range of a single data fragment.
 
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 2b370b3..5b1317e 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -312,7 +312,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
         int num_schema_elements()
 
         void set_file_path(const c_string& path)
-        void AppendRowGroups(const CFileMetaData& other)
+        void AppendRowGroups(const CFileMetaData& other) except +
 
         unique_ptr[CRowGroupMetaData] RowGroup(int i)
         const SchemaDescriptor* schema()
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 189d02a..8c58250 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -35,11 +35,13 @@ from pyarrow._dataset import (  # noqa
     Fragment,
     HivePartitioning,
     IpcFileFormat,
+    ParquetDatasetFactory,
     ParquetFileFormat,
     ParquetFileFragment,
     ParquetReadOptions,
     Partitioning,
     PartitioningFactory,
+    RowGroupInfo,
     Scanner,
     ScanTask,
     UnionDataset,
@@ -443,6 +445,52 @@ def _union_dataset(children, schema=None, **kwargs):
     return UnionDataset(schema, children)
 
 
+def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None):
+    """
+    Create a FileSystemDataset from a `_metadata` file created via
+    `pyarrrow.parquet.write_metadata`.
+
+    Parameters
+    ----------
+    metadata_path : path,
+        Path pointing to a single file parquet metadata file
+    schema : Schema, optional
+        Optionally provide the Schema for the Dataset, in which case it will
+        not be inferred from the source.
+    filesystem : FileSystem or URI string, default None
+        If a single path is given as source and filesystem is None, then the
+        filesystem will be inferred from the path.
+        If an URI string is passed, then a filesystem object is constructed
+        using the URI's optional path component as a directory prefix. See the
+        examples below.
+        Note that the URIs on Windows must follow 'file:///C:...' or
+        'file:/C:...' patterns.
+    format : ParquetFileFormat
+        An instance of a ParquetFileFormat if special options needs to be
+        passed.
+
+    Returns
+    -------
+    FileSystemDataset
+    """
+    from pyarrow.fs import LocalFileSystem
+
+    if format is None:
+        format = ParquetFileFormat()
+    elif not isinstance(format, ParquetFileFormat):
+        raise ValueError("format argument must be a ParquetFileFormat")
+
+    if filesystem is None:
+        filesystem = LocalFileSystem()
+    else:
+        filesystem, _ = _ensure_filesystem(filesystem)
+
+    metadata_path = _normalize_path(filesystem, _stringify_path(metadata_path))
+
+    factory = ParquetDatasetFactory(metadata_path, filesystem, format)
+    return factory.finish(schema)
+
+
 def dataset(source, schema=None, format=None, filesystem=None,
             partitioning=None, partition_base_dir=None,
             exclude_invalid_files=None, ignore_prefixes=None):
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 0493892..56140bd 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -209,9 +209,20 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         const CFileSource& source() const
         const shared_ptr[CFileFormat]& format() const
 
+    cdef cppclass CRowGroupInfo "arrow::dataset::RowGroupInfo":
+        CRowGroupInfo()
+        CRowGroupInfo(int id)
+        CRowGroupInfo(
+            int id, int64_t n_rows, shared_ptr[CExpression] statistics)
+        int id() const
+        int64_t num_rows() const
+        bint Equals(const CRowGroupInfo& other)
+
     cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
             CFileFragment):
-        const vector[int]& row_groups() const
+        const vector[CRowGroupInfo]& row_groups() const
+        CResult[vector[shared_ptr[CFragment]]] SplitByRowGroup(
+            shared_ptr[CExpression] predicate)
 
     cdef cppclass CFileSystemDataset \
             "arrow::dataset::FileSystemDataset"(CDataset):
@@ -234,9 +245,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
     cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
             CFileFormat):
         CParquetFileFormatReaderOptions reader_options
-        CResult[CFragmentIterator] GetRowGroupFragments(
-            const CParquetFileFragment&,
-            shared_ptr[CExpression] extra_filter)
         CResult[shared_ptr[CFileFragment]] MakeFragment(
             CFileSource source,
             shared_ptr[CExpression] partition_expression,
@@ -313,3 +321,20 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
             shared_ptr[CFileFormat] format,
             CFileSystemFactoryOptions options
         )
+
+    cdef cppclass CParquetDatasetFactory \
+            "arrow::dataset::ParquetDatasetFactory"(CDatasetFactory):
+        @staticmethod
+        CResult[shared_ptr[CDatasetFactory]] MakeFromMetaDataPath "Make"(
+            const c_string& metadata_path,
+            shared_ptr[CFileSystem] filesystem,
+            shared_ptr[CParquetFileFormat] format
+        )
+
+        @staticmethod
+        CResult[shared_ptr[CDatasetFactory]] MakeFromMetaDataSource "Make"(
+            const CFileSource& metadata_path,
+            const c_string& base_path,
+            shared_ptr[CFileSystem] filesystem,
+            shared_ptr[CParquetFileFormat] format
+        )
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 51542ee..a71a844 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -1742,33 +1742,53 @@ def write_to_dataset(table, root_path, partition_cols=None,
             metadata_collector[-1].set_file_path(outfile)
 
 
-def write_metadata(schema, where, version='1.0',
-                   use_deprecated_int96_timestamps=False,
-                   coerce_timestamps=None):
+def write_metadata(schema, where, metadata_collector=None, **kwargs):
     """
-    Write metadata-only Parquet file from schema.
+    Write metadata-only Parquet file from schema. This can be used with
+    `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
+    files.
 
     Parameters
     ----------
     schema : pyarrow.Schema
     where: string or pyarrow.NativeFile
-    version : {"1.0", "2.0"}, default "1.0"
-        The Parquet format version, defaults to 1.0.
-    use_deprecated_int96_timestamps : bool, default False
-        Write nanosecond resolution timestamps to INT96 Parquet format.
-    coerce_timestamps : str, default None
-        Cast timestamps a particular resolution.
-        Valid values: {None, 'ms', 'us'}.
-    filesystem : FileSystem, default None
-        If nothing passed, paths assumed to be found in the local on-disk
-        filesystem.
+    metadata_collector:
+    **kwargs : dict,
+        Additional kwargs for ParquetWriter class. See docstring for
+        `ParquetWriter` for more information.
+
+    Examples
+    --------
+
+    Write a dataset and collect metadata information.
+
+    >>> metadata_collector = []
+    >>> write_to_dataset(
+    ...     table, root_path,
+    ...     metadata_collector=metadata_collector, **writer_kwargs)
+
+    Write the `_common_metadata` parquet file without row groups statistics.
+
+    >>> write_metadata(
+    ...     table.schema, root_path / '_common_metadata', **writer_kwargs)
+
+    Write the `_metadata` parquet file with row groups statistics.
+
+    >>> write_metadata(
+    ...     table.schema, root_path / '_metadata',
+    ...     metadata_collector=metadata_collector, **writer_kwargs)
     """
-    writer = ParquetWriter(
-        where, schema, version=version,
-        use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
-        coerce_timestamps=coerce_timestamps)
+    writer = ParquetWriter(where, schema, **kwargs)
     writer.close()
 
+    if metadata_collector is not None:
+        # ParquetWriter doesn't expose the metadata until it's written. Write
+        # it and read it again.
+        metadata = read_metadata(where)
+        for m in metadata_collector:
+            metadata.append_row_groups(m)
+        metadata.write_metadata_file(where)
+
 
 def read_metadata(where, memory_map=False):
     """
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index df512f8..a7c7a11 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -250,11 +250,11 @@ def test_filesystem_dataset(mockfs):
         assert isinstance(fragment, ds.ParquetFileFragment)
         assert fragment.row_groups is None
 
-        row_group_fragments = list(fragment.get_row_group_fragments())
+        row_group_fragments = list(fragment.split_by_row_group())
         assert len(row_group_fragments) == 1
         assert isinstance(fragment, ds.ParquetFileFragment)
         assert row_group_fragments[0].path == path
-        assert row_group_fragments[0].row_groups == {0}
+        assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)]
 
     fragments = list(dataset.get_fragments(filter=ds.field("const") == 0))
     assert len(fragments) == 2
@@ -552,7 +552,7 @@ def test_make_fragment(multisourcefs):
             assert f.path == path
             assert isinstance(f.filesystem, type(multisourcefs))
         assert fragment.row_groups is None
-        assert row_group_fragment.row_groups == {0}
+        assert row_group_fragment.row_groups == [ds.RowGroupInfo(0)]
 
 
 def _create_dataset_for_fragments(tempdir, chunk_size=None):
@@ -621,18 +621,19 @@ def test_fragments_implicit_cast(tempdir):
     assert len(list(fragments)) == 1
 
 
-@pytest.mark.skip(reason="ARROW-8318")
 @pytest.mark.pandas
 @pytest.mark.parquet
 def test_fragments_reconstruct(tempdir):
     table, dataset = _create_dataset_for_fragments(tempdir)
 
-    def assert_yields_projected(fragment, row_slice, schema):
-        actual = fragment.to_table(schema=schema)
-        assert actual.schema == schema.schema
+    def assert_yields_projected(fragment, row_slice,
+                                columns=None, filter=None):
+        actual = fragment.to_table(
+            schema=table.schema, columns=columns, filter=filter)
+        column_names = columns if columns else table.column_names
+        assert actual.column_names == column_names
 
-        names = schema.names
-        expected = table.slice(*row_slice).to_pandas()[[*names]]
+        expected = table.slice(*row_slice).to_pandas()[[*column_names]]
         assert actual.equals(pa.Table.from_pandas(expected))
 
     fragment = list(dataset.get_fragments())[0]
@@ -643,38 +644,37 @@ def test_fragments_reconstruct(tempdir):
         fragment.path, fragment.filesystem,
         partition_expression=fragment.partition_expression)
     assert new_fragment.to_table().equals(fragment.to_table())
-    assert_yields_projected(new_fragment, (0, 4), table.column_names)
+    assert_yields_projected(new_fragment, (0, 4))
 
     # filter / column projection, inspected schema
     new_fragment = parquet_format.make_fragment(
         fragment.path, fragment.filesystem,
-        columns=['f1'], filter=ds.field('f1') < 2,
         partition_expression=fragment.partition_expression)
-    assert_yields_projected(new_fragment, (0, 2), ['f1'])
+    assert_yields_projected(new_fragment, (0, 2), filter=ds.field('f1') < 2)
 
     # filter requiring cast / column projection, inspected schema
     new_fragment = parquet_format.make_fragment(
         fragment.path, fragment.filesystem,
-        columns=['f1'], filter=ds.field('f1') < 2.0,
         partition_expression=fragment.partition_expression)
-    assert_yields_projected(new_fragment, (0, 2), ['f1'])
+    assert_yields_projected(new_fragment, (0, 2),
+                            columns=['f1'], filter=ds.field('f1') < 2.0)
 
-    # filter on the partition column, explicit schema
+    # filter on the partition column
     new_fragment = parquet_format.make_fragment(
-        fragment.path, fragment.filesystem, schema=dataset.schema,
-        filter=ds.field('part') == 'a',
+        fragment.path, fragment.filesystem,
         partition_expression=fragment.partition_expression)
-    assert_yields_projected(new_fragment, (0, 4), table.column_names)
+    assert_yields_projected(new_fragment, (0, 4),
+                            filter=ds.field('part') == 'a')
 
-    # filter on the partition column, inspected schema
+    # Fragments don't contain the partition's columns if not provided to the
+    # `to_table(schema=...)` method.
     with pytest.raises(ValueError, match="Field named 'part' not found"):
         new_fragment = parquet_format.make_fragment(
             fragment.path, fragment.filesystem,
-            filter=ds.field('part') == 'a',
             partition_expression=fragment.partition_expression)
+        new_fragment.to_table(filter=ds.field('part') == 'a')
 
 
-@pytest.mark.skip(reason="ARROW-8318")
 @pytest.mark.pandas
 @pytest.mark.parquet
 def test_fragments_parquet_row_groups(tempdir):
@@ -683,7 +683,7 @@ def test_fragments_parquet_row_groups(tempdir):
     fragment = list(dataset.get_fragments())[0]
 
     # list and scan row group fragments
-    row_group_fragments = list(fragment.get_row_group_fragments())
+    row_group_fragments = list(fragment.split_by_row_group())
     assert len(row_group_fragments) == 2
     result = row_group_fragments[0].to_table(schema=dataset.schema)
     assert result.column_names == ['f1', 'f2', 'part']
@@ -691,13 +691,12 @@ def test_fragments_parquet_row_groups(tempdir):
     assert result.equals(table.slice(0, 2))
 
     fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0]
-    row_group_fragments = list(fragment.get_row_group_fragments())
+    row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1))
     assert len(row_group_fragments) == 1
     result = row_group_fragments[0].to_table(filter=ds.field('f1') < 1)
     assert len(result) == 1
 
 
-@pytest.mark.skip(reason="ARROW-8318")
 @pytest.mark.pandas
 @pytest.mark.parquet
 def test_fragments_parquet_row_groups_reconstruct(tempdir):
@@ -705,7 +704,7 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir):
 
     fragment = list(dataset.get_fragments())[0]
     parquet_format = fragment.format
-    row_group_fragments = list(fragment.get_row_group_fragments())
+    row_group_fragments = list(fragment.split_by_row_group())
 
     # manually re-construct row group fragments
     new_fragment = parquet_format.make_fragment(
@@ -720,7 +719,7 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir):
         fragment.path, fragment.filesystem,
         partition_expression=fragment.partition_expression,
         row_groups={1})
-    result = new_fragment.to_table(columns=['f1', 'part'],
+    result = new_fragment.to_table(schema=table.schema, columns=['f1', 'part'],
                                    filter=ds.field('f1') < 3, )
     assert result.column_names == ['f1', 'part']
     assert len(result) == 1
@@ -730,7 +729,7 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir):
         fragment.path, fragment.filesystem,
         partition_expression=fragment.partition_expression,
         row_groups={2})
-    with pytest.raises(IndexError, match="trying to scan row group 2"):
+    with pytest.raises(IndexError, match="Trying to scan row group 2"):
         new_fragment.to_table()
 
 
@@ -1441,3 +1440,101 @@ def test_feather_format(tempdir):
     write_feather(table, str(basedir / "data1.feather"), version=1)
     with pytest.raises(ValueError):
         ds.dataset(basedir, format="feather").to_table()
+
+
+def _create_parquet_dataset_simple(root_path):
+    import pyarrow.parquet as pq
+
+    metadata_collector = []
+
+    for i in range(4):
+        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
+        pq.write_to_dataset(
+            table, str(root_path), metadata_collector=metadata_collector
+        )
+
+    metadata_path = str(root_path / '_metadata')
+    # write _metadata file
+    pq.write_metadata(
+        table.schema, metadata_path,
+        metadata_collector=metadata_collector
+    )
+    return metadata_path, table
+
+
+@pytest.mark.parquet
+@pytest.mark.pandas  # write_to_dataset currently requires pandas
+def test_parquet_dataset_factory(tempdir):
+    root_path = tempdir / "test_parquet_dataset"
+    metadata_path, table = _create_parquet_dataset_simple(root_path)
+    dataset = ds.parquet_dataset(metadata_path)
+    assert dataset.schema.equals(table.schema)
+    assert len(dataset.files) == 4
+    result = dataset.to_table()
+    assert result.num_rows == 40
+
+
+@pytest.mark.parquet
+@pytest.mark.pandas
+def test_parquet_dataset_factory_invalid(tempdir):
+    root_path = tempdir / "test_parquet_dataset_invalid"
+    metadata_path, table = _create_parquet_dataset_simple(root_path)
+    # remove one of the files
+    list(root_path.glob("*.parquet"))[0].unlink()
+    dataset = ds.parquet_dataset(metadata_path)
+    assert dataset.schema.equals(table.schema)
+    assert len(dataset.files) == 4
+    with pytest.raises(FileNotFoundError):
+        dataset.to_table()
+
+
+def _create_metadata_file(root_path):
+    # create _metadata file from existing parquet dataset
+    import pyarrow.parquet as pq
+
+    parquet_paths = list(sorted(root_path.rglob("*.parquet")))
+    schema = pq.ParquetFile(parquet_paths[0]).schema.to_arrow_schema()
+
+    metadata_collector = []
+    for path in parquet_paths:
+        metadata = pq.ParquetFile(path).metadata
+        metadata.set_file_path(str(path.relative_to(root_path)))
+        metadata_collector.append(metadata)
+
+    metadata_path = root_path / "_metadata"
+    pq.write_metadata(
+        schema, metadata_path, metadata_collector=metadata_collector
+    )
+    return metadata_path
+
+
+def _create_parquet_dataset_partitioned(root_path):
+    import pyarrow.parquet as pq
+
+    table = pa.table({
+        'f1': range(20), 'f2': np.random.randn(20),
+        'part': np.repeat(['a', 'b'], 10)}
+    )
+    pq.write_to_dataset(table, str(root_path), partition_cols=['part'])
+    return _create_metadata_file(root_path), table
+
+
+@pytest.mark.parquet
+@pytest.mark.pandas
+def test_parquet_dataset_factory_partitioned(tempdir):
+    # TODO support for specifying partitioning scheme
+
+    root_path = tempdir / "test_parquet_dataset_factory_partitioned"
+    metadata_path, table = _create_parquet_dataset_partitioned(root_path)
+
+    dataset = ds.parquet_dataset(metadata_path)
+    # TODO partition column not yet included
+    # assert dataset.schema.equals(table.schema)
+    assert len(dataset.files) == 2
+    result = dataset.to_table()
+    assert result.num_rows == 20
+
+    # the partitioned dataset does not preserve order
+    result = result.to_pandas().sort_values("f1").reset_index(drop=True)
+    expected = table.to_pandas().drop(columns=["part"])
+    pd.testing.assert_frame_equal(result, expected)