You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2020/03/27 16:52:57 UTC

[arrow] branch master updated: ARROW-8061: [C++][Dataset] Provide RowGroup fragments for ParquetFileFormat

This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ca1706  ARROW-8061: [C++][Dataset] Provide RowGroup fragments for ParquetFileFormat
2ca1706 is described below

commit 2ca1706625eb73b75e004f87eaf6bc038f2af813
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Fri Mar 27 12:52:33 2020 -0400

    ARROW-8061: [C++][Dataset] Provide RowGroup fragments for ParquetFileFormat
    
    Provides ParquetFileFragment, which may view a subset of row groups within a parquet file. The indices of viewed row groups are available through the `row_groups()` property which is exposed to python. Construction of subset-viewing ParquetFileFragments is not yet exposed to python.
    
    Closes #6670 from bkietz/8061-Ability-to-specify-granul
    
    Authored-by: Benjamin Kietzman <be...@gmail.com>
    Signed-off-by: François Saint-Jacques <fs...@gmail.com>
---
 cpp/src/arrow/dataset/file_base.cc           |   4 +-
 cpp/src/arrow/dataset/file_base.h            |  11 +-
 cpp/src/arrow/dataset/file_parquet.cc        | 103 +++++++++++--
 cpp/src/arrow/dataset/file_parquet.h         |  49 ++++++
 cpp/src/arrow/dataset/file_parquet_test.cc   | 142 ++++++++++++++++--
 cpp/src/arrow/dataset/filter.cc              |   2 +
 cpp/src/arrow/dataset/scanner.cc             |  92 +++++-------
 cpp/src/arrow/dataset/scanner.h              |   8 +
 cpp/src/arrow/dataset/scanner_internal.h     |  29 +++-
 cpp/src/arrow/dataset/test_util.h            |   2 +-
 cpp/src/arrow/dataset/type_fwd.h             |   9 ++
 cpp/src/arrow/filesystem/filesystem.h        |   2 +-
 python/pyarrow/_dataset.pyx                  | 214 ++++++++++++++++++++++++++-
 python/pyarrow/dataset.py                    |   1 +
 python/pyarrow/includes/libarrow_dataset.pxd |  44 +++++-
 python/pyarrow/includes/libarrow_fs.pxd      |   1 +
 python/pyarrow/tests/test_dataset.py         | 111 ++++++++++----
 17 files changed, 683 insertions(+), 141 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 1da3dd7..75df308 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -63,8 +63,8 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
     FileSource source, std::shared_ptr<ScanOptions> options,
     std::shared_ptr<Expression> partition_expression) {
-  return std::make_shared<FileFragment>(std::move(source), shared_from_this(), options,
-                                        std::move(partition_expression));
+  return std::shared_ptr<FileFragment>(new FileFragment(
+      std::move(source), shared_from_this(), options, std::move(partition_expression)));
 }
 
 Result<std::shared_ptr<WriteTask>> FileFormat::WriteFragment(
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 8571eca..157a425 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -162,19 +162,13 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
  public:
   Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;
 
-  // XXX should this include format_->type_name?
-  std::string type_name() const override { return "file"; }
+  std::string type_name() const override { return format_->type_name(); }
   bool splittable() const override { return format_->splittable(); }
 
   const FileSource& source() const { return source_; }
   const std::shared_ptr<FileFormat>& format() const { return format_; }
 
-  FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
-               std::shared_ptr<ScanOptions> scan_options)
-      : Fragment(std::move(scan_options)),
-        source_(std::move(source)),
-        format_(std::move(format)) {}
-
+ protected:
   FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                std::shared_ptr<ScanOptions> scan_options,
                std::shared_ptr<Expression> partition_expression)
@@ -182,7 +176,6 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
         source_(std::move(source)),
         format_(std::move(format)) {}
 
- protected:
   FileSource source_;
   std::shared_ptr<FileFormat> format_;
 
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index aa87691..107f9c4 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -183,26 +183,29 @@ class RowGroupSkipper {
 
   RowGroupSkipper(std::shared_ptr<parquet::FileMetaData> metadata,
                   parquet::ArrowReaderProperties arrow_properties,
-                  std::shared_ptr<Expression> filter)
+                  std::shared_ptr<Expression> filter, std::vector<int> row_groups)
       : metadata_(std::move(metadata)),
         arrow_properties_(std::move(arrow_properties)),
         filter_(std::move(filter)),
-        row_group_idx_(0) {
-    num_row_groups_ = metadata_->num_row_groups();
-  }
+        row_group_idx_(0),
+        row_groups_(std::move(row_groups)),
+        num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups()
+                                            : static_cast<int>(row_groups_.size())) {}
 
   int Next() {
     while (row_group_idx_ < num_row_groups_) {
-      const auto row_group_idx = row_group_idx_++;
-      const auto row_group = metadata_->RowGroup(row_group_idx);
+      const int row_group =
+          row_groups_.empty() ? row_group_idx_++ : row_groups_[row_group_idx_++];
+
+      const auto row_group_metadata = metadata_->RowGroup(row_group);
 
-      const auto num_rows = row_group->num_rows();
-      if (CanSkip(*row_group)) {
+      const int64_t num_rows = row_group_metadata->num_rows();
+      if (CanSkip(*row_group_metadata)) {
         rows_skipped_ += num_rows;
         continue;
       }
 
-      return row_group_idx;
+      return row_group;
     }
 
     return kIterationDone;
@@ -225,6 +228,7 @@ class RowGroupSkipper {
   parquet::ArrowReaderProperties arrow_properties_;
   std::shared_ptr<Expression> filter_;
   int row_group_idx_;
+  std::vector<int> row_groups_;
   int num_row_groups_;
   int64_t rows_skipped_;
 };
@@ -234,7 +238,8 @@ class ParquetScanTaskIterator {
   static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
                                        std::shared_ptr<ScanContext> context,
                                        std::unique_ptr<parquet::ParquetFileReader> reader,
-                                       parquet::ArrowReaderProperties arrow_properties) {
+                                       parquet::ArrowReaderProperties arrow_properties,
+                                       const std::vector<int>& row_groups) {
     auto metadata = reader->metadata();
 
     auto column_projection = InferColumnProjection(*metadata, arrow_properties, options);
@@ -244,7 +249,7 @@ class ParquetScanTaskIterator {
                                                    arrow_properties, &arrow_reader));
 
     RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                            options->filter);
+                            options->filter, row_groups);
 
     return ScanTaskIterator(ParquetScanTaskIterator(
         std::move(options), std::move(context), std::move(column_projection),
@@ -373,12 +378,86 @@ Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
     std::shared_ptr<ScanContext> context) const {
+  return ScanFile(source, std::move(options), std::move(context), {});
+}
+
+Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
+    const FileSource& source, std::shared_ptr<ScanOptions> options,
+    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
   auto properties = MakeReaderProperties(*this, context->pool);
   ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
 
+  for (int i : row_groups) {
+    if (i >= reader->metadata()->num_row_groups()) {
+      return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
+                                " only has ", reader->metadata()->num_row_groups(),
+                                " row groups");
+    }
+  }
+
   auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
   return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), std::move(arrow_properties));
+                                       std::move(reader), std::move(arrow_properties),
+                                       row_groups);
+}
+
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<ScanOptions> options,
+    std::shared_ptr<Expression> partition_expression, std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(
+      new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options),
+                              std::move(partition_expression), std::move(row_groups)));
+}
+
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<ScanOptions> options,
+    std::shared_ptr<Expression> partition_expression) {
+  return std::shared_ptr<FileFragment>(
+      new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options),
+                              std::move(partition_expression), {}));
+}
+
+Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
+    const ParquetFileFragment& fragment, std::shared_ptr<Expression> extra_filter) {
+  auto properties = MakeReaderProperties(*this);
+  ARROW_ASSIGN_OR_RAISE(auto reader,
+                        OpenReader(fragment.source(), std::move(properties)));
+
+  auto arrow_properties =
+      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
+  auto metadata = reader->metadata();
+
+  auto row_groups = fragment.row_groups();
+  if (row_groups.empty()) {
+    row_groups = internal::Iota(metadata->num_row_groups());
+  }
+  FragmentVector fragments(row_groups.size());
+
+  auto new_options = std::make_shared<ScanOptions>(*fragment.scan_options());
+  if (!extra_filter->Equals(true)) {
+    new_options->filter = and_(std::move(extra_filter), std::move(new_options->filter));
+  }
+
+  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
+                          new_options->filter, std::move(row_groups));
+
+  for (int i = 0, row_group = skipper.Next();
+       row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
+    ARROW_ASSIGN_OR_RAISE(fragments[i++],
+                          MakeFragment(fragment.source(), new_options,
+                                       fragment.partition_expression(), {row_group}));
+  }
+
+  return MakeVectorIterator(std::move(fragments));
+}
+
+Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanContext> context) {
+  return parquet_format().ScanFile(source_, scan_options_, std::move(context),
+                                   row_groups_);
+}
+
+const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
+  return internal::checked_cast<const ParquetFileFormat&>(*format_);
 }
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index e270c78..3192d0d 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <unordered_set>
 #include <utility>
+#include <vector>
 
 #include "arrow/dataset/file_base.h"
 #include "arrow/dataset/type_fwd.h"
@@ -86,6 +87,54 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context) const override;
+
+  /// \brief Open a file for scanning, restricted to the specified row groups.
+  Result<ScanTaskIterator> ScanFile(const FileSource& source,
+                                    std::shared_ptr<ScanOptions> options,
+                                    std::shared_ptr<ScanContext> context,
+                                    const std::vector<int>& row_groups) const;
+
+  using FileFormat::MakeFragment;
+
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<ScanOptions> options,
+      std::shared_ptr<Expression> partition_expression) override;
+
+  /// \brief Create a Fragment, restricted to the specified row groups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<ScanOptions> options,
+      std::shared_ptr<Expression> partition_expression, std::vector<int> row_groups);
+
+  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// Row groups whose metadata contradicts the fragment's filter or the extra_filter
+  /// will be excluded.
+  Result<FragmentIterator> GetRowGroupFragments(
+      const ParquetFileFragment& fragment,
+      std::shared_ptr<Expression> extra_filter = scalar(true));
+};
+
+class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
+ public:
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;
+
+  /// \brief The row groups viewed by this Fragment. This may be empty which signifies all
+  /// row groups are selected.
+  const std::vector<int>& row_groups() const { return row_groups_; }
+
+ private:
+  ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
+                      std::shared_ptr<ScanOptions> scan_options,
+                      std::shared_ptr<Expression> partition_expression,
+                      std::vector<int> row_groups)
+      : FileFragment(std::move(source), std::move(format), std::move(scan_options),
+                     std::move(partition_expression)),
+        row_groups_(std::move(row_groups)) {}
+
+  const ParquetFileFormat& parquet_format() const;
+
+  std::vector<int> row_groups_;
+
+  friend class ParquetFileFormat;
 };
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 9a3dcad..9a7c8fb 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -29,7 +29,9 @@
 #include "arrow/testing/util.h"
 #include "arrow/type.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/range.h"
 #include "parquet/arrow/writer.h"
+#include "parquet/metadata.h"
 
 namespace arrow {
 namespace dataset {
@@ -50,6 +52,8 @@ using parquet::arrow::WriteTable;
 
 using testing::Pointee;
 
+using internal::checked_pointer_cast;
+
 class ArrowParquetWriterMixin : public ::testing::Test {
  public:
   Status WriteRecordBatch(const RecordBatch& batch, FileWriter* writer) {
@@ -149,6 +153,12 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin {
     return Batches(std::move(scan_task_it));
   }
 
+  std::shared_ptr<RecordBatch> SingleBatch(Fragment* fragment) {
+    auto batches = IteratorToVector(Batches(fragment));
+    EXPECT_EQ(batches.size(), 1);
+    return batches.front();
+  }
+
   void CountRowsAndBatchesInScan(Fragment* fragment, int64_t expected_rows,
                                  int64_t expected_batches) {
     int64_t actual_rows = 0;
@@ -164,6 +174,34 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin {
     EXPECT_EQ(actual_batches, expected_batches);
   }
 
+  void CountRowsAndBatchesInScan(const std::shared_ptr<Fragment>& fragment,
+                                 int64_t expected_rows, int64_t expected_batches) {
+    return CountRowsAndBatchesInScan(fragment.get(), expected_rows, expected_batches);
+  }
+
+  void CountRowGroupsInFragment(const std::shared_ptr<Fragment>& fragment,
+                                std::vector<int> expected_row_groups,
+                                const Expression& filter,
+                                const Expression& extra_filter = *scalar(true)) {
+    fragment->scan_options()->filter = filter.Copy();
+
+    auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
+    ASSERT_OK_AND_ASSIGN(
+        auto row_group_fragments,
+        format_->GetRowGroupFragments(*parquet_fragment, extra_filter.Copy()));
+
+    auto expected_row_group = expected_row_groups.begin();
+    for (auto maybe_fragment : row_group_fragments) {
+      ASSERT_OK_AND_ASSIGN(auto fragment, std::move(maybe_fragment));
+      auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
+
+      auto i = *expected_row_group++;
+      EXPECT_EQ(parquet_fragment->row_groups(), std::vector<int>{i});
+
+      EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), i + 1);
+    }
+  }
+
  protected:
   std::shared_ptr<Schema> schema_ = schema({field("f64", float64())});
   std::shared_ptr<ParquetFileFormat> format_ = std::make_shared<ParquetFileFormat>();
@@ -369,34 +407,118 @@ TEST_F(TestParquetFileFormat, PredicatePushdown) {
   ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_));
 
   opts_->filter = scalar(true);
-  CountRowsAndBatchesInScan(fragment.get(), kTotalNumRows, kNumRowGroups);
+  CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups);
 
   for (int64_t i = 1; i <= kNumRowGroups; i++) {
     opts_->filter = ("i64"_ == int64_t(i)).Copy();
-    CountRowsAndBatchesInScan(fragment.get(), i, 1);
+    CountRowsAndBatchesInScan(fragment, i, 1);
   }
 
-  /* Out of bound filters should skip all RowGroups. */
+  // Out of bound filters should skip all RowGroups.
   opts_->filter = scalar(false);
-  CountRowsAndBatchesInScan(fragment.get(), 0, 0);
+  CountRowsAndBatchesInScan(fragment, 0, 0);
   opts_->filter = ("i64"_ == int64_t(kNumRowGroups + 1)).Copy();
-  CountRowsAndBatchesInScan(fragment.get(), 0, 0);
+  CountRowsAndBatchesInScan(fragment, 0, 0);
   opts_->filter = ("i64"_ == int64_t(-1)).Copy();
-  CountRowsAndBatchesInScan(fragment.get(), 0, 0);
+  CountRowsAndBatchesInScan(fragment, 0, 0);
   // No rows match 1 and 2.
   opts_->filter = ("i64"_ == int64_t(1) and "u8"_ == uint8_t(2)).Copy();
-  CountRowsAndBatchesInScan(fragment.get(), 0, 0);
+  CountRowsAndBatchesInScan(fragment, 0, 0);
 
   opts_->filter = ("i64"_ == int64_t(2) or "i64"_ == int64_t(4)).Copy();
-  CountRowsAndBatchesInScan(fragment.get(), 2 + 4, 2);
+  CountRowsAndBatchesInScan(fragment, 2 + 4, 2);
 
   opts_->filter = ("i64"_ < int64_t(6)).Copy();
-  CountRowsAndBatchesInScan(fragment.get(), 5 * (5 + 1) / 2, 5);
+  CountRowsAndBatchesInScan(fragment, 5 * (5 + 1) / 2, 5);
 
   opts_->filter = ("i64"_ >= int64_t(6)).Copy();
-  CountRowsAndBatchesInScan(fragment.get(), kTotalNumRows - (5 * (5 + 1) / 2),
+  CountRowsAndBatchesInScan(fragment, kTotalNumRows - (5 * (5 + 1) / 2),
                             kNumRowGroups - 5);
 }
 
+TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) {
+  constexpr int64_t kNumRowGroups = 16;
+  constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
+
+  auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
+  auto source = GetFileSource(reader.get());
+
+  opts_ = ScanOptions::Make(reader->schema());
+  ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_));
+
+  CountRowGroupsInFragment(fragment, internal::Iota(static_cast<int>(kTotalNumRows)),
+                           *scalar(true));
+
+  for (int i = 0; i < kNumRowGroups; ++i) {
+    CountRowGroupsInFragment(fragment, {i}, "i64"_ == int64_t(i + 1));
+  }
+
+  // Out of bound filters should skip all RowGroups.
+  CountRowGroupsInFragment(fragment, {}, *scalar(false));
+  CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(kNumRowGroups + 1));
+  CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(-1));
+
+  // No rows match 1 and 2.
+  CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(1) and "u8"_ == uint8_t(2));
+  CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(2), "i64"_ == int64_t(4));
+
+  CountRowGroupsInFragment(fragment, {1, 3},
+                           "i64"_ == int64_t(2) or "i64"_ == int64_t(4));
+
+  CountRowGroupsInFragment(fragment, {0, 1, 2, 3, 4}, "i64"_ < int64_t(6));
+
+  CountRowGroupsInFragment(fragment, internal::Iota(5, static_cast<int>(kNumRowGroups)),
+                           "i64"_ >= int64_t(6));
+
+  CountRowGroupsInFragment(fragment, {5, 6, 7}, "i64"_ >= int64_t(6),
+                           "i64"_ < int64_t(8));
+}
+
+TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) {
+  constexpr int64_t kNumRowGroups = 16;
+  constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
+
+  auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
+  auto source = GetFileSource(reader.get());
+
+  opts_ = ScanOptions::Make(reader->schema());
+
+  auto row_groups_fragment = [&](std::vector<int> row_groups) {
+    EXPECT_OK_AND_ASSIGN(auto fragment,
+                         format_->MakeFragment(*source, opts_, scalar(true), row_groups));
+    return internal::checked_pointer_cast<ParquetFileFragment>(fragment);
+  };
+
+  // empty selection is identical to selecting all row groups
+  EXPECT_TRUE(row_groups_fragment({})->row_groups().empty());
+  CountRowsAndBatchesInScan(row_groups_fragment({}), kTotalNumRows, kNumRowGroups);
+
+  // individual selection selects a single row group
+  for (int i = 0; i < kNumRowGroups; ++i) {
+    CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1);
+    EXPECT_EQ(row_groups_fragment({i})->row_groups(), std::vector<int>{i});
+  }
+
+  for (int i = 0; i < kNumRowGroups; ++i) {
+    // conflicting selection/filter
+    opts_->filter = ("i64"_ == int64_t(i)).Copy();
+    CountRowsAndBatchesInScan(row_groups_fragment({i}), 0, 0);
+  }
+
+  for (int i = 0; i < kNumRowGroups; ++i) {
+    // identical selection/filter
+    opts_->filter = ("i64"_ == int64_t(i + 1)).Copy();
+    CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1);
+  }
+
+  opts_->filter = ("i64"_ > int64_t(3)).Copy();
+  CountRowsAndBatchesInScan(row_groups_fragment({2, 3, 4, 5}), 4 + 5 + 6, 3);
+
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      IndexError,
+      testing::HasSubstr("only has " + std::to_string(kNumRowGroups) + " row groups"),
+      row_groups_fragment({kNumRowGroups + 1})->Scan(ctx_));
+}
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/cpp/src/arrow/dataset/filter.cc b/cpp/src/arrow/dataset/filter.cc
index 5004e5a..ca5c3a7 100644
--- a/cpp/src/arrow/dataset/filter.cc
+++ b/cpp/src/arrow/dataset/filter.cc
@@ -1260,5 +1260,7 @@ Result<std::shared_ptr<RecordBatch>> TreeEvaluator::Filter(
   return batch->Slice(0, 0);
 }
 
+std::shared_ptr<ScalarExpression> scalar(bool value) { return scalar(MakeScalar(value)); }
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 31a1380..4b0884e 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -60,24 +60,40 @@ std::vector<std::string> ScanOptions::MaterializedFields() const {
   return fields;
 }
 
-Result<RecordBatchIterator> InMemoryScanTask::Execute() {
-  return MakeVectorIterator(record_batches_);
-}
+Result<std::shared_ptr<Table>> ScanTask::ToTable(
+    const std::shared_ptr<ScanOptions>& options,
+    const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_task_it) {
+  std::mutex mutex;
+  RecordBatchVector batches;
 
-/// \brief GetScanTaskIterator transforms an Iterator<Fragment> in a
-/// flattened Iterator<ScanTask>.
-static ScanTaskIterator GetScanTaskIterator(FragmentIterator fragments,
-                                            std::shared_ptr<ScanContext> context) {
-  // Fragment -> ScanTaskIterator
-  auto fn = [context](std::shared_ptr<Fragment> fragment) {
-    return fragment->Scan(context);
-  };
+  auto task_group = context->TaskGroup();
+
+  for (auto maybe_scan_task : scan_task_it) {
+    ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
+
+    task_group->Append([&batches, &mutex, scan_task] {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+
+      for (auto maybe_batch : batch_it) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
+        std::lock_guard<std::mutex> lock(mutex);
+        batches.emplace_back(std::move(batch));
+      }
 
-  // Iterator<Iterator<ScanTask>>
-  auto maybe_scantask_it = MakeMaybeMapIterator(fn, std::move(fragments));
+      return Status::OK();
+    });
+  }
+
+  // Wait for all tasks to complete, or the first error.
+  RETURN_NOT_OK(task_group->Finish());
 
-  // Iterator<ScanTask>
-  return MakeFlattenIterator(std::move(maybe_scantask_it));
+  std::shared_ptr<Table> out;
+  RETURN_NOT_OK(Table::FromRecordBatches(options->schema(), batches, &out));
+  return out;
+}
+
+Result<RecordBatchIterator> InMemoryScanTask::Execute() {
+  return MakeVectorIterator(record_batches_);
 }
 
 FragmentIterator Scanner::GetFragments() {
@@ -168,51 +184,9 @@ std::shared_ptr<internal::TaskGroup> ScanContext::TaskGroup() const {
   return internal::TaskGroup::MakeSerial();
 }
 
-struct TableAggregator {
-  void Append(std::shared_ptr<RecordBatch> batch) {
-    std::lock_guard<std::mutex> lock(m);
-    batches.emplace_back(std::move(batch));
-  }
-
-  Result<std::shared_ptr<Table>> Finish(const std::shared_ptr<Schema>& schema) {
-    std::shared_ptr<Table> out;
-    RETURN_NOT_OK(Table::FromRecordBatches(schema, batches, &out));
-    return out;
-  }
-
-  std::mutex m;
-  std::vector<std::shared_ptr<RecordBatch>> batches;
-};
-
-struct ScanTaskPromise {
-  Status operator()() {
-    ARROW_ASSIGN_OR_RAISE(auto it, task->Execute());
-    for (auto maybe_batch : it) {
-      ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
-      aggregator.Append(std::move(batch));
-    }
-
-    return Status::OK();
-  }
-
-  TableAggregator& aggregator;
-  std::shared_ptr<ScanTask> task;
-};
-
 Result<std::shared_ptr<Table>> Scanner::ToTable() {
-  auto task_group = scan_context_->TaskGroup();
-
-  TableAggregator aggregator;
-  ARROW_ASSIGN_OR_RAISE(auto it, Scan());
-  for (auto maybe_scan_task : it) {
-    ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
-    task_group->Append(ScanTaskPromise{aggregator, std::move(scan_task)});
-  }
-
-  // Wait for all tasks to complete, or the first error.
-  RETURN_NOT_OK(task_group->Finish());
-
-  return aggregator.Finish(scan_options_->schema());
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
+  return ScanTask::ToTable(scan_options_, scan_context_, std::move(scan_task_it));
 }
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 05b8e43..0304000 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -114,6 +114,14 @@ class ARROW_DS_EXPORT ScanTask {
   const std::shared_ptr<ScanOptions>& options() const { return options_; }
   const std::shared_ptr<ScanContext>& context() const { return context_; }
 
+  /// \brief Convert a sequence of ScanTasks into a Table.
+  ///
+  /// Use this convenience utility with care. This will serially materialize the
+  /// Scan result in memory before creating the Table.
+  static Result<std::shared_ptr<Table>> ToTable(
+      const std::shared_ptr<ScanOptions>& options,
+      const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_tasks);
+
  protected:
   ScanTask(std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
       : options_(std::move(options)), context_(std::move(context)) {}
diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h
index d299fa9..0cbd4a5 100644
--- a/cpp/src/arrow/dataset/scanner_internal.h
+++ b/cpp/src/arrow/dataset/scanner_internal.h
@@ -27,10 +27,9 @@
 namespace arrow {
 namespace dataset {
 
-static inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it,
-                                                    const ExpressionEvaluator& evaluator,
-                                                    const Expression& filter,
-                                                    MemoryPool* pool) {
+inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it,
+                                             const ExpressionEvaluator& evaluator,
+                                             const Expression& filter, MemoryPool* pool) {
   return MakeMaybeMapIterator(
       [&filter, &evaluator, pool](std::shared_ptr<RecordBatch> in) {
         return evaluator.Evaluate(filter, *in, pool).Map([&](compute::Datum selection) {
@@ -40,9 +39,9 @@ static inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it,
       std::move(it));
 }
 
-static inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it,
-                                                     RecordBatchProjector* projector,
-                                                     MemoryPool* pool) {
+inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it,
+                                              RecordBatchProjector* projector,
+                                              MemoryPool* pool) {
   return MakeMaybeMapIterator(
       [=](std::shared_ptr<RecordBatch> in) {
         // The RecordBatchProjector is shared accross ScanTasks of the same
@@ -71,5 +70,21 @@ class FilterAndProjectScanTask : public ScanTask {
   std::shared_ptr<ScanTask> task_;
 };
 
+/// \brief GetScanTaskIterator transforms an Iterator<Fragment> in a
+/// flattened Iterator<ScanTask>.
+inline ScanTaskIterator GetScanTaskIterator(FragmentIterator fragments,
+                                            std::shared_ptr<ScanContext> context) {
+  // Fragment -> ScanTaskIterator
+  auto fn = [context](std::shared_ptr<Fragment> fragment) {
+    return fragment->Scan(context);
+  };
+
+  // Iterator<Iterator<ScanTask>>
+  auto maybe_scantask_it = MakeMaybeMapIterator(fn, std::move(fragments));
+
+  // Iterator<ScanTask>
+  return MakeFlattenIterator(std::move(maybe_scantask_it));
+}
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 7e33a2f..b7feaef 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -316,7 +316,7 @@ struct MakeFileSystemDatasetMixin {
 
 static const std::string& PathOf(const std::shared_ptr<Fragment>& fragment) {
   EXPECT_NE(fragment, nullptr);
-  EXPECT_EQ(fragment->type_name(), "file");
+  EXPECT_THAT(fragment->type_name(), "dummy");
   return internal::checked_cast<const FileFragment&>(*fragment).source().path();
 }
 
diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h
index 141ede8..118b94d 100644
--- a/cpp/src/arrow/dataset/type_fwd.h
+++ b/cpp/src/arrow/dataset/type_fwd.h
@@ -53,6 +53,11 @@ class FileFormat;
 class FileFragment;
 class FileSystemDataset;
 
+class ParquetFileFormat;
+class ParquetFileFragment;
+
+class IpcFileFormat;
+
 class Expression;
 using ExpressionVector = std::vector<std::shared_ptr<Expression>>;
 
@@ -67,6 +72,10 @@ class ScalarExpression;
 class FieldReferenceExpression;
 class ExpressionEvaluator;
 
+/// forward declared to facilitate scalar(true) as a default for Expression parameters
+ARROW_DS_EXPORT
+std::shared_ptr<ScalarExpression> scalar(bool);
+
 class Partitioning;
 class PartitioningFactory;
 class PartitioningOrFactory;
diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h
index f4a2d32..fae8a40 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -156,7 +156,7 @@ struct ARROW_EXPORT FileSelector {
 };
 
 /// \brief Abstract file system API
-class ARROW_EXPORT FileSystem {
+class ARROW_EXPORT FileSystem : public std::enable_shared_from_this<FileSystem> {
  public:
   virtual ~FileSystem();
 
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index fd5aac5..de55002 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -27,8 +27,6 @@ from pyarrow.lib cimport *
 from pyarrow.includes.libarrow_dataset cimport *
 from pyarrow.compat import frombytes, tobytes
 from pyarrow._fs cimport FileSystem, FileInfo, FileSelector
-from pyarrow.types import (is_null, is_boolean, is_integer, is_floating,
-                           is_string)
 
 
 def _forbid_instantiation(klass, subclasses_instead=True):
@@ -336,6 +334,11 @@ cdef class FileSystemDataset(Dataset):
         return FileFormat.wrap(self.filesystem_dataset.format())
 
 
+def _empty_dataset_scanner(Schema schema not None, columns=None, filter=None):
+    dataset = UnionDataset(schema, children=[])
+    return Scanner(dataset, columns=columns, filter=filter)
+
+
 cdef class FileFormat:
 
     cdef:
@@ -367,6 +370,35 @@ cdef class FileFormat:
     cdef inline shared_ptr[CFileFormat] unwrap(self):
         return self.wrapped
 
+    def inspect(self, str path not None, FileSystem filesystem not None):
+        cdef:
+            shared_ptr[CSchema] c_schema
+
+        c_schema = GetResultValue(self.format.Inspect(CFileSource(
+            tobytes(path), filesystem.unwrap().get())))
+        return pyarrow_wrap_schema(move(c_schema))
+
+    def make_fragment(self, str path not None, FileSystem filesystem not None,
+                      Schema schema=None, columns=None, filter=None,
+                      Expression partition_expression=ScalarExpression(True)):
+        cdef:
+            shared_ptr[CScanOptions] c_options
+            shared_ptr[CFileFragment] c_fragment
+            Scanner scanner
+
+        if schema is None:
+            schema = self.inspect(path, filesystem)
+
+        scanner = _empty_dataset_scanner(schema, columns, filter)
+        c_options = scanner.unwrap().get().options()
+
+        c_fragment = GetResultValue(
+            self.format.MakeFragment(CFileSource(tobytes(path),
+                                                 filesystem.unwrap().get()),
+                                     move(c_options),
+                                     partition_expression.unwrap()))
+        return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+
 
 cdef class Fragment:
     """Fragment of data from a Dataset."""
@@ -386,8 +418,12 @@ cdef class Fragment:
         cdef Fragment self = Fragment()
 
         typ = frombytes(sp.get().type_name())
-        if typ == 'file':
+        if typ == 'ipc':
+            # IpcFileFormat does not have a corresponding subclass
+            # of FileFragment
             self = FileFragment.__new__(FileFragment)
+        elif typ == 'parquet':
+            self = ParquetFileFragment.__new__(ParquetFileFragment)
         else:
             self = Fragment()
 
@@ -396,12 +432,66 @@ cdef class Fragment:
 
     @property
     def partition_expression(self):
-        """
-        An Expression which evaluates to true for all data viewed by this
+        """An Expression which evaluates to true for all data viewed by this
         Fragment.
         """
         return Expression.wrap(self.fragment.partition_expression())
 
+    def to_table(self, use_threads=True, MemoryPool memory_pool=None):
+        """Convert this Fragment into a Table.
+
+        Use this convenience utility with care. This will serially materialize
+        the Scan result in memory before creating the Table.
+
+        Returns
+        -------
+        table : Table
+        """
+        cdef:
+            shared_ptr[CScanContext] context
+            shared_ptr[CScanOptions] options
+            CScanTaskIterator iterator
+            shared_ptr[CTable] table
+
+        options = self.fragment.scan_options()
+
+        context = make_shared[CScanContext]()
+        context.get().pool = maybe_unbox_memory_pool(memory_pool)
+
+        iterator = move(GetResultValue(self.fragment.Scan(context)))
+        table = GetResultValue(CScanTask.ToTable(options, context,
+                                                 move(iterator)))
+
+        return pyarrow_wrap_table(table)
+
+    def scan(self, MemoryPool memory_pool=None):
+        """Returns a stream of ScanTasks
+
+        The caller is responsible to dispatch/schedule said tasks. Tasks should
+        be safe to run in a concurrent fashion and outlive the iterator.
+
+        Returns
+        -------
+        scan_tasks : iterator of ScanTask
+        """
+        cdef:
+            shared_ptr[CScanContext] context
+            CScanTaskIterator iterator
+            shared_ptr[CScanTask] task
+
+        # create scan context
+        context = make_shared[CScanContext]()
+        context.get().pool = maybe_unbox_memory_pool(memory_pool)
+
+        iterator = move(GetResultValue(self.fragment.Scan(move(context))))
+
+        while True:
+            task = GetResultValue(iterator.Next())
+            if task.get() == nullptr:
+                raise StopIteration()
+            else:
+                yield ScanTask.wrap(task)
+
 
 cdef class FileFragment(Fragment):
     """A Fragment representing a data file."""
@@ -416,11 +506,38 @@ cdef class FileFragment(Fragment):
     @property
     def path(self):
         """
-        The path of the data file viewed by this fragment.
+        The path of the data file viewed by this fragment, if it views a
+        file. If instead it views a buffer, this will be "<Buffer>".
         """
         return frombytes(self.file_fragment.source().path())
 
     @property
+    def filesystem(self):
+        """
+        The FileSystem containing the data file viewed by this fragment, if
+        it views a file. If instead it views a buffer, this will be None.
+        """
+        cdef:
+            shared_ptr[CFileSystem] fs
+        fs = self.file_fragment.source().filesystem().shared_from_this()
+        return FileSystem.wrap(fs)
+
+    @property
+    def buffer(self):
+        """
+        The buffer viewed by this fragment, if it views a buffer. If
+        instead it views a file, this will be None.
+        """
+        cdef:
+            shared_ptr[CBuffer] c_buffer
+        c_buffer = self.file_fragment.source().buffer()
+
+        if c_buffer.get() == nullptr:
+            return None
+
+        return pyarrow_wrap_buffer(c_buffer)
+
+    @property
     def format(self):
         """
         The format of the data file viewed by this fragment.
@@ -428,6 +545,52 @@ cdef class FileFragment(Fragment):
         return FileFormat.wrap(self.file_fragment.format())
 
 
+cdef class ParquetFileFragment(FileFragment):
+    """A Fragment representing a parquet file."""
+
+    cdef:
+        CParquetFileFragment* parquet_file_fragment
+
+    cdef void init(self, const shared_ptr[CFragment]& sp):
+        FileFragment.init(self, sp)
+        self.parquet_file_fragment = <CParquetFileFragment*> sp.get()
+
+    @property
+    def row_groups(self):
+        row_groups = set(self.parquet_file_fragment.row_groups())
+        if len(row_groups) != 0:
+            return row_groups
+        return None
+
+    def get_row_group_fragments(self, Expression extra_filter=None):
+        """
+        Yield a Fragment wrapping each row group in this ParquetFileFragment.
+        Row groups will be excluded whose metadata contradicts the either the
+        filter provided on construction of this Fragment or the extra_filter
+        argument.
+        """
+        cdef:
+            CParquetFileFormat* c_format
+            CFragmentIterator c_iterator
+            shared_ptr[CExpression] c_extra_filter
+            shared_ptr[CFragment] c_fragment
+
+        if extra_filter is None:
+            extra_filter = ScalarExpression(True)
+        c_extra_filter = extra_filter.unwrap()
+
+        c_format = <CParquetFileFormat*> self.file_fragment.format().get()
+        c_iterator = move(GetResultValue(c_format.GetRowGroupFragments(deref(
+            self.parquet_file_fragment), move(c_extra_filter))))
+
+        while True:
+            c_fragment = GetResultValue(c_iterator.Next())
+            if c_fragment.get() == nullptr:
+                raise StopIteration()
+            else:
+                yield Fragment.wrap(c_fragment)
+
+
 cdef class ParquetFileFormatReaderOptions:
     cdef:
         CParquetFileFormatReaderOptions* options
@@ -474,14 +637,48 @@ cdef class ParquetFileFormat(FileFormat):
 
     def __init__(self, dict reader_options=dict()):
         self.init(<shared_ptr[CFileFormat]> make_shared[CParquetFileFormat]())
-        self.parquet_format = <CParquetFileFormat*> self.wrapped.get()
         for name, value in reader_options.items():
             setattr(self.reader_options, name, value)
 
+    cdef void init(self, const shared_ptr[CFileFormat]& sp):
+        FileFormat.init(self, sp)
+        self.parquet_format = <CParquetFileFormat*> self.wrapped.get()
+
     @property
     def reader_options(self):
         return ParquetFileFormatReaderOptions(self)
 
+    def make_fragment(self, str path not None, FileSystem filesystem not None,
+                      Schema schema=None, columns=None, filter=None,
+                      Expression partition_expression=ScalarExpression(True),
+                      row_groups=None):
+        cdef:
+            shared_ptr[CScanOptions] c_options
+            shared_ptr[CFileFragment] c_fragment
+            Scanner scanner
+            vector[int] c_row_groups
+
+        if row_groups is None:
+            return super().make_fragment(path, filesystem, schema, columns,
+                                         filter, partition_expression)
+        for row_group in set(row_groups):
+            c_row_groups.push_back(<int> row_group)
+
+        if schema is None:
+            schema = self.inspect(path, filesystem)
+
+        scanner = _empty_dataset_scanner(schema, columns, filter)
+        c_options = scanner.unwrap().get().options()
+
+        c_fragment = GetResultValue(
+            self.parquet_format.MakeFragment(CFileSource(tobytes(path),
+                                                         filesystem.unwrap()
+                                                         .get()),
+                                             move(c_options),
+                                             partition_expression.unwrap(),
+                                             move(c_row_groups)))
+        return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+
 
 cdef class IpcFileFormat(FileFormat):
 
@@ -1109,6 +1306,9 @@ cdef class Scanner:
         self.init(sp)
         return self
 
+    cdef inline shared_ptr[CScanner] unwrap(self):
+        return self.wrapped
+
     def scan(self):
         """Returns a stream of ScanTasks
 
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 5e087ad..21f46e3 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -43,6 +43,7 @@ from pyarrow._dataset import (  # noqa
     NotExpression,
     OrExpression,
     ParquetFileFormat,
+    ParquetFileFragment,
     Partitioning,
     PartitioningFactory,
     ScalarExpression,
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 6229c50..7118f4d 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -132,16 +132,25 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         "arrow::dataset::InsertImplicitCasts"(
             const CExpression &, const CSchema&)
 
+    cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
+        pass
+
     cdef cppclass CScanContext "arrow::dataset::ScanContext":
         CMemoryPool * pool
 
-    cdef cppclass CScanTask" arrow::dataset::ScanTask":
-        CResult[CRecordBatchIterator] Execute()
-
     ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \
         "arrow::dataset::ScanTaskIterator"
 
+    cdef cppclass CScanTask" arrow::dataset::ScanTask":
+        CResult[CRecordBatchIterator] Execute()
+        @staticmethod
+        CResult[shared_ptr[CTable]] ToTable(
+            const shared_ptr[CScanOptions]&,
+            const shared_ptr[CScanContext]&,
+            CScanTaskIterator)
+
     cdef cppclass CFragment "arrow::dataset::Fragment":
+        const shared_ptr[CScanOptions]& scan_options() const
         CResult[CScanTaskIterator] Scan(shared_ptr[CScanContext] context)
         c_bool splittable()
         c_string type_name()
@@ -157,6 +166,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         CResult[CScanTaskIterator] Scan()
         CResult[shared_ptr[CTable]] ToTable()
         CFragmentIterator GetFragments()
+        const shared_ptr[CScanOptions]& options()
 
     cdef cppclass CScannerBuilder "arrow::dataset::ScannerBuilder":
         CScannerBuilder(shared_ptr[CDataset],
@@ -210,15 +220,27 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
             vector[shared_ptr[CDatasetFactory]] factories)
 
     cdef cppclass CFileSource "arrow::dataset::FileSource":
-        const c_string& path()
+        const c_string& path() const
+        CFileSystem* filesystem() const
+        const shared_ptr[CBuffer]& buffer() const
+        CFileSource(c_string path, CFileSystem* filesystem)
 
     cdef cppclass CFileFormat "arrow::dataset::FileFormat":
-        c_string type_name()
+        c_string type_name() const
+        CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const
+        CResult[shared_ptr[CFileFragment]] MakeFragment(
+            CFileSource source,
+            shared_ptr[CScanOptions] options,
+            shared_ptr[CExpression] partition_expression)
 
     cdef cppclass CFileFragment "arrow::dataset::FileFragment"(
             CFragment):
-        const CFileSource& source()
-        shared_ptr[CFileFormat] format()
+        const CFileSource& source() const
+        const shared_ptr[CFileFormat]& format() const
+
+    cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
+            CFileFragment):
+        const vector[int]& row_groups() const
 
     cdef cppclass CFileSystemDataset \
             "arrow::dataset::FileSystemDataset"(CDataset):
@@ -243,6 +265,14 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
     cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
             CFileFormat):
         CParquetFileFormatReaderOptions reader_options
+        CResult[CFragmentIterator] GetRowGroupFragments(
+            const CParquetFileFragment&,
+            shared_ptr[CExpression] extra_filter)
+        CResult[shared_ptr[CFileFragment]] MakeFragment(
+            CFileSource source,
+            shared_ptr[CScanOptions] options,
+            shared_ptr[CExpression] partition_expression,
+            vector[int] row_groups)
 
     cdef cppclass CIpcFileFormat "arrow::dataset::IpcFileFormat"(
             CFileFormat):
diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd
index f3d46ed..824d830 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -59,6 +59,7 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
         c_bool recursive
 
     cdef cppclass CFileSystem "arrow::fs::FileSystem":
+        shared_ptr[CFileSystem] shared_from_this()
         c_string type_name() const
         CResult[c_string] NormalizePath(c_string path)
         CResult[CFileInfo] GetFileInfo(const c_string& path)
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index cf62b32..05ef3ff 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -85,19 +85,6 @@ def mockfs(request):
 
     mockfs = fs._MockFileSystem()
 
-    data = [
-        list(range(5)),
-        list(map(float, range(5))),
-        list(map(str, range(5)))
-    ]
-    schema = pa.schema([
-        pa.field('i64', pa.int64()),
-        pa.field('f64', pa.float64()),
-        pa.field('str', pa.string())
-    ])
-    batch = pa.record_batch(data, schema=schema)
-    table = pa.Table.from_batches([batch])
-
     directories = [
         'subdir/1/xxx',
         'subdir/2/yyy',
@@ -107,6 +94,21 @@ def mockfs(request):
         path = '{}/file{}.parquet'.format(directory, i)
         mockfs.create_dir(directory)
         with mockfs.open_output_stream(path) as out:
+            data = [
+                list(range(5)),
+                list(map(float, range(5))),
+                list(map(str, range(5))),
+                [i] * 5
+            ]
+            schema = pa.schema([
+                pa.field('i64', pa.int64()),
+                pa.field('f64', pa.float64()),
+                pa.field('str', pa.string()),
+                pa.field('const', pa.int64()),
+            ])
+            batch = pa.record_batch(data, schema=schema)
+            table = pa.Table.from_batches([batch])
+
             pq.write_table(table, out)
 
     return mockfs
@@ -179,7 +181,9 @@ def dataset(mockfs):
 
 
 def test_filesystem_dataset(mockfs):
-    schema = pa.schema([])
+    schema = pa.schema([
+        pa.field('const', pa.int64())
+    ])
 
     file_format = ds.ParquetFileFormat()
 
@@ -225,12 +229,24 @@ def test_filesystem_dataset(mockfs):
     assert set(dataset.files) == set(paths)
 
     fragments = list(dataset.get_fragments())
-    assert fragments[0].partition_expression.equals(
-        ds.AndExpression(root_partition, partitions[0]))
-    assert fragments[1].partition_expression.equals(
-        ds.AndExpression(root_partition, partitions[1]))
-    assert fragments[0].path == paths[0]
-    assert fragments[1].path == paths[1]
+    for fragment, partition, path in zip(fragments, partitions, paths):
+        assert fragment.partition_expression.equals(
+            ds.AndExpression(root_partition, partition))
+        assert fragment.path == path
+        assert isinstance(fragment, ds.ParquetFileFragment)
+        assert fragment.row_groups is None
+
+        row_group_fragments = list(fragment.get_row_group_fragments())
+        assert len(row_group_fragments) == 1
+        assert isinstance(fragment, ds.ParquetFileFragment)
+        assert row_group_fragments[0].path == path
+        assert row_group_fragments[0].row_groups == {0}
+
+    # test predicate pushdown using row group metadata
+    fragments = list(dataset.get_fragments(filter=ds.field("const") == 0))
+    assert len(fragments) == 2
+    assert len(list(fragments[0].get_row_group_fragments())) == 1
+    assert len(list(fragments[1].get_row_group_fragments())) == 0
 
 
 def test_dataset(dataset):
@@ -505,6 +521,7 @@ def test_filesystem_factory(mockfs, paths_or_selector):
         pa.field('i64', pa.int64()),
         pa.field('f64', pa.float64()),
         pa.field('str', pa.dictionary(pa.int32(), pa.string())),
+        pa.field('const', pa.int64()),
         pa.field('group', pa.int32()),
         pa.field('key', pa.string()),
     ]), check_metadata=False)
@@ -525,20 +542,61 @@ def test_filesystem_factory(mockfs, paths_or_selector):
         pa.array([0, 1, 2, 3, 4], type=pa.int32()),
         pa.array("0 1 2 3 4".split(), type=pa.string()))
     for task, group, key in zip(scanner.scan(), [1, 2], ['xxx', 'yyy']):
-        expected_group_column = pa.array([group] * 5, type=pa.int32())
-        expected_key_column = pa.array([key] * 5, type=pa.string())
+        expected_group = pa.array([group] * 5, type=pa.int32())
+        expected_key = pa.array([key] * 5, type=pa.string())
+        expected_const = pa.array([group - 1] * 5, type=pa.int64())
         for batch in task.execute():
-            assert batch.num_columns == 5
+            assert batch.num_columns == 6
             assert batch[0].equals(expected_i64)
             assert batch[1].equals(expected_f64)
             assert batch[2].equals(expected_str)
-            assert batch[3].equals(expected_group_column)
-            assert batch[4].equals(expected_key_column)
+            assert batch[3].equals(expected_const)
+            assert batch[4].equals(expected_group)
+            assert batch[5].equals(expected_key)
 
     table = dataset.to_table()
     assert isinstance(table, pa.Table)
     assert len(table) == 10
-    assert table.num_columns == 5
+    assert table.num_columns == 6
+
+
+def test_make_fragment(multisourcefs):
+    parquet_format = ds.ParquetFileFormat()
+    dataset = ds.dataset('/plain', filesystem=multisourcefs,
+                         format=parquet_format)
+
+    for path in dataset.files:
+        fragment = parquet_format.make_fragment(path, multisourcefs)
+        row_group_fragment = parquet_format.make_fragment(path, multisourcefs,
+                                                          row_groups=[0])
+        for f in [fragment, row_group_fragment]:
+            assert isinstance(f, ds.ParquetFileFragment)
+            assert f.path == path
+            assert isinstance(f.filesystem, type(multisourcefs))
+        assert fragment.row_groups is None
+        assert row_group_fragment.row_groups == {0}
+
+
+def test_parquet_row_group_fragments(tempdir):
+    import pyarrow as pa
+    import pyarrow.parquet as pq
+
+    table = pa.table({'a': ['a', 'a', 'b', 'b'], 'b': [1, 2, 3, 4]})
+    pq.write_to_dataset(table, str(tempdir / "test_parquet_dataset"),
+                        partition_cols=["a"])
+
+    import pyarrow.dataset as ds
+    dataset = ds.dataset(str(tempdir / "test_parquet_dataset/"),
+                         format="parquet", partitioning="hive")
+
+    fragments = list(dataset.get_fragments())
+    f = fragments[0]
+    parquet_format = f.format
+    parquet_format.make_fragment(f.path, f.filesystem,
+                                 partition_expression=f.partition_expression)
+    parquet_format.make_fragment(
+        f.path, f.filesystem, partition_expression=f.partition_expression,
+        row_groups={1})
 
 
 def test_partitioning_factory(mockfs):
@@ -559,6 +617,7 @@ def test_partitioning_factory(mockfs):
         ("i64", pa.int64()),
         ("f64", pa.float64()),
         ("str", pa.string()),
+        ("const", pa.int64()),
         ("group", pa.int32()),
         ("key", pa.string()),
     ])