You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/05/14 18:35:22 UTC

[GitHub] [arrow] fsaintjacques opened a new pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

fsaintjacques opened a new pull request #7180:
URL: https://github.com/apache/arrow/pull/7180


   - Implement ParquetDatasetFactory
   - Replace ParquetFileFormat::GetRowGroupFragments with
     ParquetFileFragment::SplitByRowGroup (and the corresponding bindings).
   - Add various optimizations, notably in ColumnChunkStatisticsAsExpression.
   - Consolidate RowGroupSkipper logic in ParquetFileFragment::GetRowGroupFragments.
   - Ensure FileMetaData::AppendRowGroups checks for schema equality.
   - Implement dataset._parquet_dataset


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r430367401



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +317,316 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline bool RowGroupInfosAreComplete(const std::vector<RowGroupInfo>& infos) {
+  return !infos.empty() &&
+         std::all_of(infos.cbegin(), infos.cend(),
+                     [](const RowGroupInfo& i) { return i.HasStatistics(); });
+}
+
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  auto filter = [&predicate](const RowGroupInfo& info) {
+    return !info.Satisfy(predicate);
+  };
+  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
+  row_groups.erase(end, row_groups.end());
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
+    std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    if (!info.HasStatistics() && info.id() < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+  std::for_each(row_groups.begin(), row_groups.end(), augment);
+
+  return row_groups;
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (row_groups_are_complete) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
+
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  if (!row_groups_are_complete) {
+    ARROW_ASSIGN_OR_RAISE(row_groups,
+                          AugmentRowGroups(std::move(row_groups), reader.get()));
+    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  }
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
+///
+/// RowGroupInfo
+///
 
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
-
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)),
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  std::vector<RowGroupInfo> row_groups;
+  if (HasCompleteMetadata()) {

Review comment:
       > @jorisvandenbossche this is now lazy.
   
   Cool, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r427493814



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));

Review comment:
       Correct. It could be filtered, but only if the dataset was generated via the `_metadata` file (or any explicit RowGroupInfo).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428250624



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -315,20 +233,28 @@ class ParquetScanTaskIterator {
   }
 
   ParquetScanTaskIterator(std::shared_ptr<ScanOptions> options,
-                          std::shared_ptr<ScanContext> context,
-                          std::vector<int> column_projection, RowGroupSkipper skipper,
-                          std::unique_ptr<parquet::arrow::FileReader> reader)
+                          std::shared_ptr<ScanContext> context, FileSource source,
+                          std::unique_ptr<parquet::arrow::FileReader> reader,
+                          std::vector<int> column_projection,
+                          std::vector<RowGroupInfo> row_groups)
       : options_(std::move(options)),
         context_(std::move(context)),
+        source_(std::move(source)),
+        reader_(std::move(reader)),
         column_projection_(std::move(column_projection)),
-        skipper_(std::move(skipper)),
-        reader_(std::move(reader)) {}
+        row_groups_(std::move(row_groups)) {}
 
   std::shared_ptr<ScanOptions> options_;
   std::shared_ptr<ScanContext> context_;
-  std::vector<int> column_projection_;
-  RowGroupSkipper skipper_;
+
+  FileSource source_;

Review comment:
       For debug purposes, this is extremely useful to introspect the object.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428183533



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {

Review comment:
       please restrict the scope of try blocks to wrap just the function calls which may throw

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -315,20 +233,28 @@ class ParquetScanTaskIterator {
   }
 
   ParquetScanTaskIterator(std::shared_ptr<ScanOptions> options,
-                          std::shared_ptr<ScanContext> context,
-                          std::vector<int> column_projection, RowGroupSkipper skipper,
-                          std::unique_ptr<parquet::arrow::FileReader> reader)
+                          std::shared_ptr<ScanContext> context, FileSource source,
+                          std::unique_ptr<parquet::arrow::FileReader> reader,
+                          std::vector<int> column_projection,
+                          std::vector<RowGroupInfo> row_groups)
       : options_(std::move(options)),
         context_(std::move(context)),
+        source_(std::move(source)),
+        reader_(std::move(reader)),
         column_projection_(std::move(column_projection)),
-        skipper_(std::move(skipper)),
-        reader_(std::move(reader)) {}
+        row_groups_(std::move(row_groups)) {}
 
   std::shared_ptr<ScanOptions> options_;
   std::shared_ptr<ScanContext> context_;
-  std::vector<int> column_projection_;
-  RowGroupSkipper skipper_;
+
+  FileSource source_;

Review comment:
       What is this used for?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);

Review comment:
       ```suggestion
       row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());

Review comment:
       ```suggestion
         auto row_group = metadata->RowGroup(id);
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> paths_and_row_group_size;
+
+    ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
+
+    for (int i = 0; i < metadata.num_row_groups(); i++) {
+      auto row_group = metadata.RowGroup(i);
+      ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, *row_group));
+      auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+      auto num_rows = row_group->num_rows();
+
+      // Insert the path, or increase the count of row groups. It will be
+      // assumed that the RowGroup of a file are ordered exactly like in
+      // the metadata file.
+      auto elem_and_inserted =
+          paths_and_row_group_size.insert({path, {{0, num_rows, stats}}});
+      if (!elem_and_inserted.second) {
+        auto& path_and_count = *elem_and_inserted.first;
+        auto& row_groups = path_and_count.second;
+        path_and_count.second.emplace_back(row_groups.size(), num_rows, stats);
+      }
+    }
+
+    std::vector<std::shared_ptr<FileFragment>> fragments;
+    for (const auto& elem : paths_and_row_group_size) {

Review comment:
       ```suggestion
       for (auto&& elem : path_to_row_group_info) {
   ```
   std::move doesn't forward to a move cosntructor if the argument is const

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -163,124 +158,47 @@ static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
               less_equal(field_expr, scalar(max)));
 }
 
-static Result<std::shared_ptr<Expression>> RowGroupStatisticsAsExpression(
-    const parquet::RowGroupMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
-  ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
-
-  ExpressionVector expressions;
-  for (const auto& schema_field : manifest.schema_fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata));
+static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+    const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) {
+  const auto& fields = manifest.schema_fields;
+  ExpressionVector expressions{fields.size()};
+  for (const auto& field : fields) {
+    expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, metadata));
   }
 
   return expressions.empty() ? scalar(true) : and_(expressions);
 }
 
-// Skip RowGroups with a filter and metadata
-class RowGroupSkipper {
- public:
-  static constexpr int kIterationDone = -1;
-
-  RowGroupSkipper(std::shared_ptr<parquet::FileMetaData> metadata,
-                  parquet::ArrowReaderProperties arrow_properties,
-                  std::shared_ptr<Expression> filter, std::vector<int> row_groups)
-      : metadata_(std::move(metadata)),
-        arrow_properties_(std::move(arrow_properties)),
-        filter_(std::move(filter)),
-        row_group_idx_(0),
-        row_groups_(std::move(row_groups)),
-        num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups()
-                                            : static_cast<int>(row_groups_.size())) {}
-
-  int Next() {
-    while (row_group_idx_ < num_row_groups_) {
-      const int row_group =
-          row_groups_.empty() ? row_group_idx_++ : row_groups_[row_group_idx_++];
-
-      const auto row_group_metadata = metadata_->RowGroup(row_group);
-
-      const int64_t num_rows = row_group_metadata->num_rows();
-      if (CanSkip(*row_group_metadata)) {
-        rows_skipped_ += num_rows;
-        continue;
-      }
-
-      return row_group;
-    }
-
-    return kIterationDone;
-  }
-
- private:
-  bool CanSkip(const parquet::RowGroupMetaData& metadata) const {
-    auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata, arrow_properties_);
-    // Errors with statistics are ignored and post-filtering will apply.
-    if (!maybe_stats_expr.ok()) {
-      return false;
-    }
-
-    auto stats_expr = maybe_stats_expr.ValueOrDie();
-    return !filter_->Assume(stats_expr)->IsSatisfiable();
-  }
-
-  std::shared_ptr<parquet::FileMetaData> metadata_;
-  parquet::ArrowReaderProperties arrow_properties_;
-  std::shared_ptr<Expression> filter_;
-  int row_group_idx_;
-  std::vector<int> row_groups_;
-  int num_row_groups_;
-  int64_t rows_skipped_;
-};
-
 class ParquetScanTaskIterator {
  public:
   static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,

Review comment:
       Nothing here can fail so we can just make the constructor public

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> paths_and_row_group_size;
+
+    ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
+
+    for (int i = 0; i < metadata.num_row_groups(); i++) {
+      auto row_group = metadata.RowGroup(i);
+      ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, *row_group));
+      auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+      auto num_rows = row_group->num_rows();
+
+      // Insert the path, or increase the count of row groups. It will be
+      // assumed that the RowGroup of a file are ordered exactly like in
+      // the metadata file.
+      auto elem_and_inserted =
+          paths_and_row_group_size.insert({path, {{0, num_rows, stats}}});
+      if (!elem_and_inserted.second) {
+        auto& path_and_count = *elem_and_inserted.first;
+        auto& row_groups = path_and_count.second;
+        path_and_count.second.emplace_back(row_groups.size(), num_rows, stats);
+      }

Review comment:
       You can avoid constructing the vector for failed insertion:
   ```suggestion
         auto it = path_to_row_group_info.insert({path, {{}}}).first;
         auto& row_groups = it->second;
         row_groups.emplace_back(row_groups.size(), num_rows, std::move(stats));
   ```

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -747,8 +747,7 @@ Status TypedIntegralStatisticsAsScalars(const Statistics& statistics,
       using CType = typename StatisticsType::T;
       return MakeMinMaxScalar<CType, StatisticsType>(statistics, min, max);
     default:
-      return Status::NotImplemented("Cannot extract statistics for type ",
-                                    logical_type->ToString());
+      return Status::NotImplemented("Cannot extract statistics for type ");

Review comment:
       Maybe this should return `optional<pair<shared_ptr<Scalar>, shared_ptr<Scalar>>>` instead so that callers can decide whether a verbose error is appropriate? Follow up, in any case

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -163,124 +158,47 @@ static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
               less_equal(field_expr, scalar(max)));
 }
 
-static Result<std::shared_ptr<Expression>> RowGroupStatisticsAsExpression(
-    const parquet::RowGroupMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
-  ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
-
-  ExpressionVector expressions;
-  for (const auto& schema_field : manifest.schema_fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata));
+static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+    const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) {
+  const auto& fields = manifest.schema_fields;
+  ExpressionVector expressions{fields.size()};

Review comment:
       ```suggestion
     ExpressionVector expressions;
     expressions.reserve(fields.size());
   ```
   (otherwise the first `fields.size()` entries will be null)

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);

Review comment:
       std algorithms could be used here
   ```suggestion
     auto new_end = std::filter(row_groups.begin(), row_groups.end(), [&](const RowGroupInfo& info) {
       return info.Satisfy(predicate);
     });
     row_groups.erase(new_end, row_groups.end());
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -163,124 +158,47 @@ static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
               less_equal(field_expr, scalar(max)));
 }
 
-static Result<std::shared_ptr<Expression>> RowGroupStatisticsAsExpression(
-    const parquet::RowGroupMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
-  ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
-
-  ExpressionVector expressions;
-  for (const auto& schema_field : manifest.schema_fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata));
+static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+    const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) {
+  const auto& fields = manifest.schema_fields;
+  ExpressionVector expressions{fields.size()};
+  for (const auto& field : fields) {
+    expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, metadata));
   }
 
   return expressions.empty() ? scalar(true) : and_(expressions);
 }
 
-// Skip RowGroups with a filter and metadata
-class RowGroupSkipper {
- public:
-  static constexpr int kIterationDone = -1;
-
-  RowGroupSkipper(std::shared_ptr<parquet::FileMetaData> metadata,
-                  parquet::ArrowReaderProperties arrow_properties,
-                  std::shared_ptr<Expression> filter, std::vector<int> row_groups)
-      : metadata_(std::move(metadata)),
-        arrow_properties_(std::move(arrow_properties)),
-        filter_(std::move(filter)),
-        row_group_idx_(0),
-        row_groups_(std::move(row_groups)),
-        num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups()
-                                            : static_cast<int>(row_groups_.size())) {}
-
-  int Next() {
-    while (row_group_idx_ < num_row_groups_) {
-      const int row_group =
-          row_groups_.empty() ? row_group_idx_++ : row_groups_[row_group_idx_++];
-
-      const auto row_group_metadata = metadata_->RowGroup(row_group);
-
-      const int64_t num_rows = row_group_metadata->num_rows();
-      if (CanSkip(*row_group_metadata)) {
-        rows_skipped_ += num_rows;
-        continue;
-      }
-
-      return row_group;
-    }
-
-    return kIterationDone;
-  }
-
- private:
-  bool CanSkip(const parquet::RowGroupMetaData& metadata) const {
-    auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata, arrow_properties_);
-    // Errors with statistics are ignored and post-filtering will apply.
-    if (!maybe_stats_expr.ok()) {
-      return false;
-    }
-
-    auto stats_expr = maybe_stats_expr.ValueOrDie();
-    return !filter_->Assume(stats_expr)->IsSatisfiable();
-  }
-
-  std::shared_ptr<parquet::FileMetaData> metadata_;
-  parquet::ArrowReaderProperties arrow_properties_;
-  std::shared_ptr<Expression> filter_;
-  int row_group_idx_;
-  std::vector<int> row_groups_;
-  int num_row_groups_;
-  int64_t rows_skipped_;
-};
-
 class ParquetScanTaskIterator {
  public:
   static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
                                        std::shared_ptr<ScanContext> context,
-                                       std::unique_ptr<parquet::ParquetFileReader> reader,
-                                       parquet::ArrowReaderProperties arrow_properties,
-                                       const std::vector<int>& row_groups) {
-    auto metadata = reader->metadata();
-
-    auto column_projection = InferColumnProjection(*metadata, arrow_properties, options);
-
-    std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
-    RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader),
-                                                   arrow_properties, &arrow_reader));
-
-    RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                            options->filter, row_groups);
-
-    return ScanTaskIterator(ParquetScanTaskIterator(
-        std::move(options), std::move(context), std::move(column_projection),
-        std::move(skipper), std::move(arrow_reader)));
+                                       FileSource source,
+                                       std::unique_ptr<parquet::arrow::FileReader> reader,
+                                       std::vector<RowGroupInfo> row_groups) {
+    auto column_projection = InferColumnProjection(*reader, *options);
+    return static_cast<ScanTaskIterator>(ParquetScanTaskIterator(
+        std::move(options), std::move(context), std::move(source), std::move(reader),
+        std::move(column_projection), std::move(row_groups)));
   }
 
   Result<std::shared_ptr<ScanTask>> Next() {
-    auto row_group = skipper_.Next();
-
-    // Iteration is done.
-    if (row_group == RowGroupSkipper::kIterationDone) {
+    if (idx_ >= row_groups_.size()) {
       return nullptr;
     }
 
+    auto row_group = row_groups_[idx_++];
     return std::shared_ptr<ScanTask>(
         new ParquetScanTask(row_group, column_projection_, reader_, options_, context_));
   }
 
  private:
   // Compute the column projection out of an optional arrow::Schema
-  static std::vector<int> InferColumnProjection(
-      const parquet::FileMetaData& metadata,
-      const parquet::ArrowReaderProperties& arrow_properties,
-      const std::shared_ptr<ScanOptions>& options) {
-    auto maybe_manifest = GetSchemaManifest(metadata, arrow_properties);
-    if (!maybe_manifest.ok()) {
-      return internal::Iota(metadata.num_columns());
-    }
-    auto manifest = std::move(maybe_manifest).ValueOrDie();
-
+  static std::vector<int> InferColumnProjection(const parquet::arrow::FileReader& reader,
+                                                const ScanOptions& options) {
+    auto manifest = reader.manifest();
     // Checks if the field is needed in either the projection or the filter.
-    auto fields_name = options->MaterializedFields();
+    auto fields_name = options.MaterializedFields();

Review comment:
       ```suggestion
       auto field_names = options.MaterializedFields();
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));

Review comment:
       ```suggestion
                           AugmentAndFilter(std::move(row_groups), *options->filter, reader.get()));
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");

Review comment:
       ```suggestion
         return Status::Invalid("ParquetDatasetFactory must contain a schema with at least one column");
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd
+  bool Satisfy(const Expression& predicate) const;
+
+  /// \brief Indicate if the other RowGroup points to the same RowGroup.
+  bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
+
+ private:
+  int id_;
+  int64_t num_rows_;
+  std::shared_ptr<Expression> statistics_;
 };
 
+/// \brief A FileFragment with parquet logic.
+///
+/// ParquetFileFragment provides a lazy (with respect to IO) interface to
+/// scan parquet files. Any heavy IO calls is deferred in the Scan() method.
+///
+/// The caller can provide an optional list of selected RowGroups to limit the
+/// number of scanned RowGroups, or control parallelism partitioning.

Review comment:
       "or control parallelism partitioning" is not very clear, could you change this into an example use case for limiting the number of scanned RowGroups? (I think that matches your intent here)

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {

Review comment:
       ```suggestion
     for (auto&& row_group : row_groups) {
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -770,31 +806,43 @@ cdef class ParquetFileFragment(FileFragment):
 
     @property
     def row_groups(self):
-        row_groups = set(self.parquet_file_fragment.row_groups())
-        if len(row_groups) != 0:
-            return row_groups
-        return None
+        cdef:
+            vector[CRowGroupInfo] c_row_groups
+        c_row_groups = self.parquet_file_fragment.row_groups()
+        if c_row_groups.empty():
+            return None
+        return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]
 
-    def get_row_group_fragments(self, Expression extra_filter=None):
+    def split_by_row_group(self, Expression predicate=None):
         """
+        Split the fragment in multiple fragments.

Review comment:
       ```suggestion
           Split the fragment into multiple fragments.
   ```
   Could you replicate this comment in c++?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> paths_and_row_group_size;

Review comment:
       ```suggestion
       std::unordered_map<std::string, std::vector<RowGroupInfo>> path_to_row_group_info;
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd
+  bool Satisfy(const Expression& predicate) const;
+
+  /// \brief Indicate if the other RowGroup points to the same RowGroup.
+  bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
+
+ private:
+  int id_;
+  int64_t num_rows_;
+  std::shared_ptr<Expression> statistics_;
 };
 
+/// \brief A FileFragment with parquet logic.
+///
+/// ParquetFileFragment provides a lazy (with respect to IO) interface to
+/// scan parquet files. Any heavy IO calls is deferred in the Scan() method.
+///
+/// The caller can provide an optional list of selected RowGroups to limit the
+/// number of scanned RowGroups, or control parallelism partitioning.
+///
+/// It can also attach optional statistics with each RowGroups, providing
+/// pushdown predicate benefits before invoking any heavy IO. This can induce
+/// significant performance boost when scanning high latency file systems.
 class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
  public:
   Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
                                 std::shared_ptr<ScanContext> context) override;
 
-  /// \brief The row groups viewed by this Fragment. This may be empty which signifies all
-  /// row groups are selected.
-  const std::vector<int>& row_groups() const { return row_groups_; }
+  Result<FragmentVector> SplitByRowGroup(const std::shared_ptr<Expression>& predicate);
+
+  /// \brief Return the RowGroups selected by this fragment. An empty list
+  /// represents all RowGroups in the parquet file.
+  const std::vector<RowGroupInfo>& row_groups() const { return row_groups_; }
 
  private:
   ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                       std::shared_ptr<Expression> partition_expression,
-                      std::vector<int> row_groups)
-      : FileFragment(std::move(source), std::move(format),
-                     std::move(partition_expression)),
-        row_groups_(std::move(row_groups)) {}
-
-  const ParquetFileFormat& parquet_format() const;
+                      std::vector<RowGroupInfo> row_groups);
 
-  std::vector<int> row_groups_;
+  std::vector<RowGroupInfo> row_groups_;
+  ParquetFileFormat& parquet_format_;
 
   friend class ParquetFileFormat;
 };
 
+/// \brief Create FileSystemDataset from custom `_metadata` cache file.
+///
+/// Dask and other systems will generate a cache metadata file by concatenating
+/// the RowGroupMetaData of multiple parquet files in a single parquet file.

Review comment:
       ```suggestion
   /// the RowGroupMetaData of multiple parquet files into a single (otherwise
   /// empty) parquet file.
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd

Review comment:
       ```suggestion
     /// This will return true if the RowGroup was not initialized with statistics
     /// (rather than silently reading metadata for a complete check).
   ```
   Probably overkill: this could return `optional<bool>`, with `nullopt` indicating that we have not performed a complete check?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> paths_and_row_group_size;
+
+    ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
+
+    for (int i = 0; i < metadata.num_row_groups(); i++) {
+      auto row_group = metadata.RowGroup(i);
+      ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, *row_group));
+      auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+      auto num_rows = row_group->num_rows();
+
+      // Insert the path, or increase the count of row groups. It will be
+      // assumed that the RowGroup of a file are ordered exactly like in
+      // the metadata file.
+      auto elem_and_inserted =
+          paths_and_row_group_size.insert({path, {{0, num_rows, stats}}});
+      if (!elem_and_inserted.second) {
+        auto& path_and_count = *elem_and_inserted.first;
+        auto& row_groups = path_and_count.second;
+        path_and_count.second.emplace_back(row_groups.size(), num_rows, stats);
+      }
+    }
+
+    std::vector<std::shared_ptr<FileFragment>> fragments;
+    for (const auto& elem : paths_and_row_group_size) {
+      ARROW_ASSIGN_OR_RAISE(auto fragment,
+                            format_->MakeFragment({std::move(elem.first), filesystem_},
+                                                  scalar(true), std::move(elem.second)));
+      fragments.push_back(std::move(fragment));
+    }
+
+    return fragments;
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file paths from FileMetaData:", e.what());
+  }
+}
+
+Result<std::shared_ptr<Dataset>> ParquetDatasetFactory::Finish(FinishOptions options) {
+  std::shared_ptr<Schema> schema = options.schema;
+  bool schema_missing = schema == nullptr;
+  if (schema_missing) {
+    ARROW_ASSIGN_OR_RAISE(schema, Inspect(options.inspect_options));
+  }
+
+  auto properties = MakeArrowReaderProperties(*format_, *metadata_);
+  ARROW_ASSIGN_OR_RAISE(auto fragments, CollectParquetFragments(*metadata_, properties));
+  return FileSystemDataset::Make(std::move(schema), scalar(true), format_, fragments);

Review comment:
       ```suggestion
     return FileSystemDataset::Make(std::move(schema), scalar(true), format_, std::move(fragments));
   ```

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -177,32 +177,38 @@ static inline RecordBatchVector FlattenRecordBatchVector(
   return flattened;
 }
 
+struct TableAssemblyState {
+  /// Protecting mutating accesses to batches
+  std::mutex mutex{};
+  std::vector<RecordBatchVector> batches{};
+
+  void Emplace(RecordBatchVector b, size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= position) {
+      batches.resize(position + 1);
+    }
+    batches[position] = std::move(b);
+  }
+};
+
 Result<std::shared_ptr<Table>> Scanner::ToTable() {
   ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
   auto task_group = scan_context_->TaskGroup();
 
-  // Protecting mutating accesses to batches
-  std::mutex mutex;
-  std::vector<RecordBatchVector> batches;
+  /// Wraps the state in a shared_ptr to ensure that a failing ScanTask don't
+  /// invalidate the concurrent running tasks because Finish() early returns
+  /// and the mutex/batches may got out of scope.

Review comment:
       ```suggestion
     /// Wraps the state in a shared_ptr to ensure that failing ScanTasks don't
     /// invalidate concurrently running tasks when Finish() early returns
     /// and the mutex/batches fall out of scope.
   ```
   nice catch

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -770,31 +806,43 @@ cdef class ParquetFileFragment(FileFragment):
 
     @property
     def row_groups(self):
-        row_groups = set(self.parquet_file_fragment.row_groups())
-        if len(row_groups) != 0:
-            return row_groups
-        return None
+        cdef:
+            vector[CRowGroupInfo] c_row_groups
+        c_row_groups = self.parquet_file_fragment.row_groups()
+        if c_row_groups.empty():
+            return None
+        return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]
 
-    def get_row_group_fragments(self, Expression extra_filter=None):
+    def split_by_row_group(self, Expression predicate=None):
         """
+        Split the fragment in multiple fragments.
+
         Yield a Fragment wrapping each row group in this ParquetFileFragment.
-        Row groups will be excluded whose metadata contradicts the either the
-        filter provided on construction of this Fragment or the extra_filter
-        argument.
+        Row groups will be excluded whose metadata contradicts the optional
+        predicate.
+
+        Parameters
+        ----------
+        predicate : Expression, default None
+            Exclude RowGroups whose statistics contradicts the predicate.
+
+        Returns
+        -------
+        A generator of Fragment.
         """
         cdef:
-            CParquetFileFormat* c_format
-            CFragmentIterator c_fragments
-            shared_ptr[CExpression] c_extra_filter
+            vector[shared_ptr[CFragment]] c_fragments
+            shared_ptr[CExpression] c_predicate
+            shared_ptr[CFragment] c_fragment
 
         schema = self.physical_schema
-        c_extra_filter = _insert_implicit_casts(extra_filter, schema)
-        c_format = <CParquetFileFormat*> self.file_fragment.format().get()
-        c_fragments = move(GetResultValue(c_format.GetRowGroupFragments(deref(
-            self.parquet_file_fragment), move(c_extra_filter))))
+        c_predicate = _insert_implicit_casts(predicate, schema)
+        with nogil:
+            c_fragments = move(GetResultValue(
+                self.parquet_file_fragment.SplitByRowGroup(move(c_predicate))))
 
-        for maybe_fragment in c_fragments:
-            yield Fragment.wrap(GetResultValue(move(maybe_fragment)))
+        for c_fragment in c_fragments:
+            yield Fragment.wrap(c_fragment)

Review comment:
       agreed
   ```suggestion
           return [Fragment.wrap(c_fragment) for c_fragment in c_fragments]
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1446,6 +1494,47 @@ cdef class UnionDatasetFactory(DatasetFactory):
         self.union_factory = <CUnionDatasetFactory*> sp.get()
 
 
+cdef class ParquetDatasetFactory(DatasetFactory):
+    """
+    Create a ParquetDatasetFactory from a Parquet `_metadata` file.
+
+    Parameters
+    ----------
+    metadata_path: str

Review comment:
       ```suggestion
       metadata_path : str
   ```
   Not sure if the doccomment parser actually relies on that space

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd
+  bool Satisfy(const Expression& predicate) const;
+
+  /// \brief Indicate if the other RowGroup points to the same RowGroup.
+  bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
+
+ private:
+  int id_;
+  int64_t num_rows_;
+  std::shared_ptr<Expression> statistics_;
 };
 
+/// \brief A FileFragment with parquet logic.
+///
+/// ParquetFileFragment provides a lazy (with respect to IO) interface to
+/// scan parquet files. Any heavy IO calls is deferred in the Scan() method.

Review comment:
       ```suggestion
   /// scan parquet files. Any heavy IO calls are deferred to the Scan() method.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428669952



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");

Review comment:
       Improved it for all message with the prefix: "Extracting file path from RowGroup failed."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#issuecomment-630991816


   > I don't get a segault for the test you added, just a wrong exception being throw.
   
   A FileNotFoundError sounds good (the ValueError I added in the tests was just a bit random). Will rebuild locally to see if I still get this
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r429523732



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -1468,6 +1468,7 @@ def test_parquet_dataset_factory(tempdir):
     root_path = tempdir / "test_parquet_dataset"
     metadata_path, table = _create_parquet_dataset_simple(root_path)
     dataset = ds.parquet_dataset(metadata_path)
+    print(root_path.rglob("*"))

Review comment:
       ```suggestion
       print(list(root_path.rglob("*")))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#issuecomment-631059751


   It seems this failure doesn't happen all the time for me. Running it a few times, I see also see the FileNotFoundError, but in 1 out of 2 cases, approximately. 
   
   Now, when running it on an actual example in the interactive terminal (from a small dataset where I actually deleted one of the files), I consistently see the segfault:
   
   ```
   In [1]: import pyarrow.dataset as ds                                                                                                                                                                               
   
   In [2]: dataset = ds.parquet_dataset("/tmp/tmp9qt6cph5/_metadata")                                                                                                                                                 
   
   In [3]: dataset.to_table()                                                                                                                                                                                         
   terminate called after throwing an instance of 'std::system_error'
     what():  Invalid argument
   Aborted (core dumped)
   ```
   
   I get a different stacktrace when running the tests, something about the "unlink", so it might be that the way I remove the file in the test is not very robust / is not similar to deleting a file in the file browser.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques closed pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques closed pull request #7180:
URL: https://github.com/apache/arrow/pull/7180


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#issuecomment-630985072


   I don't get a segault for the test you added, just a wrong exception being throw.
   
   ```python
   >   raise IOError(errno, message)
   E   FileNotFoundError: [Errno 2] Failed to open local file '/tmp/pytest-of-fsaintjacques/pytest-44/test_parquet_dataset_factory_i0/test_parquet_dataset/43bd0bd1002048e0b9bbc730f7614d18.parquet'. Detail: [errno 2] No such file or directory
   
   pyarrow/error.pxi:98: FileNotFoundError
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428677725



##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -747,8 +747,7 @@ Status TypedIntegralStatisticsAsScalars(const Statistics& statistics,
       using CType = typename StatisticsType::T;
       return MakeMinMaxScalar<CType, StatisticsType>(statistics, min, max);
     default:
-      return Status::NotImplemented("Cannot extract statistics for type ",
-                                    logical_type->ToString());
+      return Status::NotImplemented("Cannot extract statistics for type ");

Review comment:
       KISS, we should just extend MakeMinMaxScalar to support the other types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r427493814



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));

Review comment:
       Correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428676760



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -177,32 +177,38 @@ static inline RecordBatchVector FlattenRecordBatchVector(
   return flattened;
 }
 
+struct TableAssemblyState {
+  /// Protecting mutating accesses to batches
+  std::mutex mutex{};
+  std::vector<RecordBatchVector> batches{};
+
+  void Emplace(RecordBatchVector b, size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= position) {
+      batches.resize(position + 1);
+    }
+    batches[position] = std::move(b);
+  }
+};
+
 Result<std::shared_ptr<Table>> Scanner::ToTable() {
   ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
   auto task_group = scan_context_->TaskGroup();
 
-  // Protecting mutating accesses to batches
-  std::mutex mutex;
-  std::vector<RecordBatchVector> batches;
+  /// Wraps the state in a shared_ptr to ensure that a failing ScanTask don't
+  /// invalidate the concurrent running tasks because Finish() early returns
+  /// and the mutex/batches may got out of scope.

Review comment:
       credits goes to @jorisvandenbossche  :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428026117



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));

Review comment:
       I think it would be good to at least try to avoid IO if the statistics are already available in the RowGroupInfo's (at least, from my understanding how this is used in RAPIDS, can check with them), but certainly not critical to do in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r426804654



##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -747,8 +747,7 @@ Status TypedIntegralStatisticsAsScalars(const Statistics& statistics,
       using CType = typename StatisticsType::T;
       return MakeMinMaxScalar<CType, StatisticsType>(statistics, min, max);
     default:
-      return Status::NotImplemented("Cannot extract statistics for type ",
-                                    logical_type->ToString());
+      return Status::NotImplemented("Cannot extract statistics for type ");

Review comment:
       It turns out that this one gave a non-negligible performance boost because there's a lot of type we don't support, multiply by the number of row group and columns...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r427493504



##########
File path: python/pyarrow/dataset.py
##########
@@ -35,11 +35,13 @@
     Fragment,
     HivePartitioning,
     IpcFileFormat,
+    ParquetDatasetFactory,
     ParquetFileFormat,
     ParquetFileFragment,
     ParquetReadOptions,
     Partitioning,
     PartitioningFactory,
+    RowGroupInfo,

Review comment:
       That's required for `ParquetFileFragment.row_groups`. I could change it to only return a list of integers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428688278



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -177,32 +177,38 @@ static inline RecordBatchVector FlattenRecordBatchVector(
   return flattened;
 }
 
+struct TableAssemblyState {
+  /// Protecting mutating accesses to batches
+  std::mutex mutex{};
+  std::vector<RecordBatchVector> batches{};
+
+  void Emplace(RecordBatchVector b, size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= position) {
+      batches.resize(position + 1);
+    }
+    batches[position] = std::move(b);
+  }
+};
+
 Result<std::shared_ptr<Table>> Scanner::ToTable() {
   ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
   auto task_group = scan_context_->TaskGroup();
 
-  // Protecting mutating accesses to batches
-  std::mutex mutex;
-  std::vector<RecordBatchVector> batches;
+  /// Wraps the state in a shared_ptr to ensure that a failing ScanTask don't
+  /// invalidate the concurrent running tasks because Finish() early returns
+  /// and the mutex/batches may got out of scope.

Review comment:
       Well, the credit to find the cause is not mine, all I did was deleting a file ;)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#issuecomment-631002289


   I'm curious about the exception/segfault. If you can reproduce, feel free to share.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428250038



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -163,124 +158,47 @@ static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
               less_equal(field_expr, scalar(max)));
 }
 
-static Result<std::shared_ptr<Expression>> RowGroupStatisticsAsExpression(
-    const parquet::RowGroupMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
-  ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties));
-
-  ExpressionVector expressions;
-  for (const auto& schema_field : manifest.schema_fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata));
+static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+    const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) {
+  const auto& fields = manifest.schema_fields;
+  ExpressionVector expressions{fields.size()};

Review comment:
       I keep getting caught by this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#issuecomment-628821011


   https://issues.apache.org/jira/browse/ARROW-8062


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428715816



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +317,316 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline bool RowGroupInfosAreComplete(const std::vector<RowGroupInfo>& infos) {
+  return !infos.empty() &&
+         std::all_of(infos.cbegin(), infos.cend(),
+                     [](const RowGroupInfo& i) { return i.HasStatistics(); });
+}
+
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  auto filter = [&predicate](const RowGroupInfo& info) {
+    return !info.Satisfy(predicate);
+  };
+  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
+  row_groups.erase(end, row_groups.end());
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
+    std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    if (!info.HasStatistics() && info.id() < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+  std::for_each(row_groups.begin(), row_groups.end(), augment);
+
+  return row_groups;
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (row_groups_are_complete) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
+
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  if (!row_groups_are_complete) {
+    ARROW_ASSIGN_OR_RAISE(row_groups,
+                          AugmentRowGroups(std::move(row_groups), reader.get()));
+    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  }
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
+///
+/// RowGroupInfo
+///
 
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
-
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)),
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  std::vector<RowGroupInfo> row_groups;
+  if (HasCompleteMetadata()) {

Review comment:
       @jorisvandenbossche this is now lazy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428664320



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {

Review comment:
       All parquet calls can throw, it's just simpler otherwise I have variable scope issues wrapped in multiple blocks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r427163007



##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd

Review comment:
       missing word at the end?

##########
File path: python/pyarrow/dataset.py
##########
@@ -443,6 +445,53 @@ def _union_dataset(children, schema=None, **kwargs):
     return UnionDataset(schema, children)
 
 
+def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None):
+    """
+    Create a FileSystemDataset from a `_metadata` file created via
+    `pyarrrow.parquet.write_metadata`.
+
+    Parameters
+    ----------
+    metadata_path : path,
+        Path pointing to a single file parquet metadata file
+    schema : Schema, optional
+        Optionally provide the Schema for the Dataset, in which case it will
+        not be inferred from the source.
+    filesystem : FileSystem or URI string, default None
+        If a single path is given as source and filesystem is None, then the
+        filesystem will be inferred from the path.
+        If an URI string is passed, then a filesystem object is constructed
+        using the URI's optional path component as a directory prefix. See the
+        examples below.
+        Note that the URIs on Windows must follow 'file:///C:...' or
+        'file:/C:...' patterns.
+    format : ParquetFileFormat
+        An instance of a ParquetFileFormat if special options needs to be
+        passed.
+
+    Returns
+    -------
+    FileSystemDataset
+    """
+    from pyarrow.fs import LocalFileSystem
+
+    if not isinstance(metadata_path, str):
+        raise ValueError("metadata_path argument must be a string")

Review comment:
       ```suggestion
       metadata_path = _stringify_path(metadata_path)
   ```
   
   (then it also handles pathlib.Path objects, and will also raise an error if is not a string / cannot be converted to a string)

##########
File path: cpp/src/parquet/metadata.h
##########
@@ -258,10 +258,26 @@ class PARQUET_EXPORT FileMetaData {
 
   const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
 
-  // Set file_path ColumnChunk fields to a particular value
+  /// \brief Set a path to all ColumnChunk for all RowGroups.
+  ///
+  /// Commonly used by systems (Dask, Spark) who generates an metadata-only
+  /// parquet file. The path are usually relative to said index file.
+  ///
+  /// \param[in] path to set.
   void set_file_path(const std::string& path);
 
-  // Merge row-group metadata from "other" FileMetaData object
+  /// \brief Merge row groups from another metadata file into this one.
+  ///
+  /// The schema of the input FileMetaData must be equal to the
+  /// schema of this object.
+  ///
+  /// This is used by systems who creates an aggregate metadata only file by

Review comment:
       ```suggestion
     /// This is used by systems who creates an aggregate metadata-only file by
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));

Review comment:
       Does this mean that using `SplitByRowGroup` currently always invokes IO? (in principle it could be filtered/split based on the already-read RowGroupInfo objects ?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");

Review comment:
       Or reusing the phrase you used below as well: "Could not infer file paths from FileMetaData: .."

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -770,31 +806,43 @@ cdef class ParquetFileFragment(FileFragment):
 
     @property
     def row_groups(self):
-        row_groups = set(self.parquet_file_fragment.row_groups())
-        if len(row_groups) != 0:
-            return row_groups
-        return None
+        cdef:
+            vector[CRowGroupInfo] c_row_groups
+        c_row_groups = self.parquet_file_fragment.row_groups()
+        if c_row_groups.empty():
+            return None
+        return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]
 
-    def get_row_group_fragments(self, Expression extra_filter=None):
+    def split_by_row_group(self, Expression predicate=None):
         """
+        Split the fragment in multiple fragments.
+
         Yield a Fragment wrapping each row group in this ParquetFileFragment.
-        Row groups will be excluded whose metadata contradicts the either the
-        filter provided on construction of this Fragment or the extra_filter
-        argument.
+        Row groups will be excluded whose metadata contradicts the optional
+        predicate.
+
+        Parameters
+        ----------
+        predicate : Expression, default None
+            Exclude RowGroups whose statistics contradicts the predicate.
+
+        Returns
+        -------
+        A generator of Fragment.
         """
         cdef:
-            CParquetFileFormat* c_format
-            CFragmentIterator c_fragments
-            shared_ptr[CExpression] c_extra_filter
+            vector[shared_ptr[CFragment]] c_fragments
+            shared_ptr[CExpression] c_predicate
+            shared_ptr[CFragment] c_fragment
 
         schema = self.physical_schema
-        c_extra_filter = _insert_implicit_casts(extra_filter, schema)
-        c_format = <CParquetFileFormat*> self.file_fragment.format().get()
-        c_fragments = move(GetResultValue(c_format.GetRowGroupFragments(deref(
-            self.parquet_file_fragment), move(c_extra_filter))))
+        c_predicate = _insert_implicit_casts(predicate, schema)
+        with nogil:
+            c_fragments = move(GetResultValue(
+                self.parquet_file_fragment.SplitByRowGroup(move(c_predicate))))
 
-        for maybe_fragment in c_fragments:
-            yield Fragment.wrap(GetResultValue(move(maybe_fragment)))
+        for c_fragment in c_fragments:
+            yield Fragment.wrap(c_fragment)

Review comment:
       I am wondering, is it still needed to have this yield (generator) instead of returning a list of fragments? As by now, all the heavy work is already done, I think? (the potentially reading of parquet metadata and filtering, since the `SplitByRowGroup` is not a lazy iterator on the C++ side)

##########
File path: python/pyarrow/dataset.py
##########
@@ -35,11 +35,13 @@
     Fragment,
     HivePartitioning,
     IpcFileFormat,
+    ParquetDatasetFactory,
     ParquetFileFormat,
     ParquetFileFragment,
     ParquetReadOptions,
     Partitioning,
     PartitioningFactory,
+    RowGroupInfo,

Review comment:
       Maybe we should not expose this publicly? (is there a reason you would want to use this directly?)

##########
File path: cpp/src/parquet/metadata.h
##########
@@ -258,10 +258,26 @@ class PARQUET_EXPORT FileMetaData {
 
   const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
 
-  // Set file_path ColumnChunk fields to a particular value
+  /// \brief Set a path to all ColumnChunk for all RowGroups.
+  ///
+  /// Commonly used by systems (Dask, Spark) who generates an metadata-only
+  /// parquet file. The path are usually relative to said index file.

Review comment:
       ```suggestion
     /// parquet file. The path is usually relative to said index file.
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.

Review comment:
       On line 134 above, it seems that the default is -1 when the number of rows is not specified / not known (and not 0) ?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
-                                " only has ", reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source,
+                                       std::move(reader), std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), {}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), {row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, partition_expression(),
+                                                       {std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");

Review comment:
       It might be useful to have a bit longer / more explicit error message here (I actually ran into this, because an older version of dask was creating wrong metadata .. (fixed some time ago), and at first I didn't understand what is was about).
   
   Something like: *"Invalid metadata file for constructing a Parquet dataset: the column chunks file path should be set, but got empty file path."*, or something like this

##########
File path: python/pyarrow/parquet.py
##########
@@ -1739,33 +1739,53 @@ def write_to_dataset(table, root_path, partition_cols=None,
             metadata_collector[-1].set_file_path(outfile)
 
 
-def write_metadata(schema, where, version='1.0',
-                   use_deprecated_int96_timestamps=False,
-                   coerce_timestamps=None):
+def write_metadata(schema, where, metadata_collector=None, **kwargs):
     """
-    Write metadata-only Parquet file from schema.
+    Write metadata-only Parquet file from schema. This can be used with
+    `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
+    files.
 
     Parameters
     ----------
     schema : pyarrow.Schema
     where: string or pyarrow.NativeFile
-    version : {"1.0", "2.0"}, default "1.0"
-        The Parquet format version, defaults to 1.0.
-    use_deprecated_int96_timestamps : bool, default False
-        Write nanosecond resolution timestamps to INT96 Parquet format.
-    coerce_timestamps : str, default None
-        Cast timestamps a particular resolution.
-        Valid values: {None, 'ms', 'us'}.
-    filesystem : FileSystem, default None
-        If nothing passed, paths assumed to be found in the local on-disk
-        filesystem.
+    metadata_collector:
+    **kwargs : dict,
+        Additional kwargs for ParquetWriter class. See docstring for
+        `ParquetWriter` for more information.
+
+    Examples
+    --------
+
+    Write a dataset and collect metadata information.
+
+    >>> metadata_collector=[]

Review comment:
       ```suggestion
       >>> metadata_collector = []
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org