You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2020/04/02 15:33:17 UTC

[arrow] branch master updated: ARROW-8276: [C++][Dataset] Use Scanner for Fragment.to_table

This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ad2cc4  ARROW-8276: [C++][Dataset] Use Scanner for Fragment.to_table
9ad2cc4 is described below

commit 9ad2cc4c8844217e7e04980c7e2aff97f3a895b3
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Thu Apr 2 11:32:59 2020 -0400

    ARROW-8276: [C++][Dataset] Use Scanner for Fragment.to_table
    
    This way batches yielded by Fragment.Scan are projected and filtered.
    
    Closes #6765 from bkietz/8276-Scanning-a-Fragment-does-
    
    Lead-authored-by: Benjamin Kietzman <be...@gmail.com>
    Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
    Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
 cpp/src/arrow/dataset/file_base.cc           |  21 +---
 cpp/src/arrow/dataset/partition.cc           |  13 +++
 cpp/src/arrow/dataset/partition.h            |   3 +
 cpp/src/arrow/dataset/scanner.cc             |  64 +++++------
 cpp/src/arrow/dataset/scanner.h              |  15 ++-
 python/pyarrow/_dataset.pyx                  | 139 ++++++++++++----------
 python/pyarrow/includes/libarrow_dataset.pxd |  25 ++--
 python/pyarrow/tests/test_dataset.py         | 165 +++++++++++++++++++++++++--
 8 files changed, 303 insertions(+), 142 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 75df308..66e0e1d 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -190,25 +190,8 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
     }
 
     // if possible, extract a partition key and pass it to the projector
-    auto projector = &options[ref.i]->projector;
-    {
-      int index = -1;
-      std::shared_ptr<Scalar> value_to_materialize;
-
-      DCHECK_OK(KeyValuePartitioning::VisitKeys(
-          *partition, [&](const std::string& name, const std::shared_ptr<Scalar>& value) {
-            if (index != -1) return Status::OK();
-
-            index = projector->schema()->GetFieldIndex(name);
-            if (index != -1) value_to_materialize = value;
-
-            return Status::OK();
-          }));
-
-      if (index != -1) {
-        RETURN_NOT_OK(projector->SetDefaultValue(index, std::move(value_to_materialize)));
-      }
-    }
+    RETURN_NOT_OK(KeyValuePartitioning::SetDefaultValuesFromKeys(
+        *partition, &options[ref.i]->projector));
 
     if (ref.info().IsFile()) {
       // generate a fragment for this file
diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc
index ae64894..6ebd626 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -119,6 +119,19 @@ Status KeyValuePartitioning::VisitKeys(
                  checked_cast<const ScalarExpression*>(rhs)->value());
 }
 
+Status KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr,
+                                                      RecordBatchProjector* projector) {
+  return KeyValuePartitioning::VisitKeys(
+      expr, [projector](const std::string& name, const std::shared_ptr<Scalar>& value) {
+        ARROW_ASSIGN_OR_RAISE(auto match,
+                              FieldRef(name).FindOneOrNone(*projector->schema()));
+        if (match.indices().empty()) {
+          return Status::OK();
+        }
+        return projector->SetDefaultValue(match, value);
+      });
+}
+
 Result<std::shared_ptr<Expression>> KeyValuePartitioning::ConvertKey(
     const Key& key, const Schema& schema) {
   ARROW_ASSIGN_OR_RAISE(auto field, FieldRef(key.name).GetOneOrNone(schema));
diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h
index f8322a2..463d06c 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -159,6 +159,9 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
       const std::function<Status(const std::string& name,
                                  const std::shared_ptr<Scalar>& value)>& visitor);
 
+  static Status SetDefaultValuesFromKeys(const Expression& expr,
+                                         RecordBatchProjector* projector);
+
   /// Convert a Key to a full expression.
   /// If the field referenced in key is absent from the schema will be ignored.
   static Result<std::shared_ptr<Expression>> ConvertKey(const Key& key,
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 4b0884e..346150e 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -60,43 +60,15 @@ std::vector<std::string> ScanOptions::MaterializedFields() const {
   return fields;
 }
 
-Result<std::shared_ptr<Table>> ScanTask::ToTable(
-    const std::shared_ptr<ScanOptions>& options,
-    const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_task_it) {
-  std::mutex mutex;
-  RecordBatchVector batches;
-
-  auto task_group = context->TaskGroup();
-
-  for (auto maybe_scan_task : scan_task_it) {
-    ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
-
-    task_group->Append([&batches, &mutex, scan_task] {
-      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
-
-      for (auto maybe_batch : batch_it) {
-        ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
-        std::lock_guard<std::mutex> lock(mutex);
-        batches.emplace_back(std::move(batch));
-      }
-
-      return Status::OK();
-    });
-  }
-
-  // Wait for all tasks to complete, or the first error.
-  RETURN_NOT_OK(task_group->Finish());
-
-  std::shared_ptr<Table> out;
-  RETURN_NOT_OK(Table::FromRecordBatches(options->schema(), batches, &out));
-  return out;
-}
-
 Result<RecordBatchIterator> InMemoryScanTask::Execute() {
   return MakeVectorIterator(record_batches_);
 }
 
 FragmentIterator Scanner::GetFragments() {
+  if (fragment_ != nullptr) {
+    return MakeVectorIterator(FragmentVector{fragment_});
+  }
+
   // Transform Datasets in a flat Iterator<Fragment>. This
   // iterator is lazily constructed, i.e. Dataset::GetFragments is
   // not invoked until a Fragment is requested.
@@ -186,7 +158,33 @@ std::shared_ptr<internal::TaskGroup> ScanContext::TaskGroup() const {
 
 Result<std::shared_ptr<Table>> Scanner::ToTable() {
   ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
-  return ScanTask::ToTable(scan_options_, scan_context_, std::move(scan_task_it));
+  std::mutex mutex;
+  RecordBatchVector batches;
+
+  auto task_group = scan_context_->TaskGroup();
+
+  for (auto maybe_scan_task : scan_task_it) {
+    ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
+
+    task_group->Append([&batches, &mutex, scan_task] {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+
+      for (auto maybe_batch : batch_it) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
+        std::lock_guard<std::mutex> lock(mutex);
+        batches.emplace_back(std::move(batch));
+      }
+
+      return Status::OK();
+    });
+  }
+
+  // Wait for all tasks to complete, or the first error.
+  RETURN_NOT_OK(task_group->Finish());
+
+  std::shared_ptr<Table> out;
+  RETURN_NOT_OK(Table::FromRecordBatches(scan_options_->schema(), batches, &out));
+  return out;
 }
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 0304000..e94efd4 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -114,14 +114,6 @@ class ARROW_DS_EXPORT ScanTask {
   const std::shared_ptr<ScanOptions>& options() const { return options_; }
   const std::shared_ptr<ScanContext>& context() const { return context_; }
 
-  /// \brief Convert a sequence of ScanTasks into a Table.
-  ///
-  /// Use this convenience utility with care. This will serially materialize the
-  /// Scan result in memory before creating the Table.
-  static Result<std::shared_ptr<Table>> ToTable(
-      const std::shared_ptr<ScanOptions>& options,
-      const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_tasks);
-
  protected:
   ScanTask(std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
       : options_(std::move(options)), context_(std::move(context)) {}
@@ -166,6 +158,11 @@ class ARROW_DS_EXPORT Scanner {
         scan_options_(std::move(scan_options)),
         scan_context_(std::move(scan_context)) {}
 
+  Scanner(std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanContext> scan_context)
+      : fragment_(std::move(fragment)),
+        scan_options_(fragment_->scan_options()),
+        scan_context_(std::move(scan_context)) {}
+
   /// \brief The Scan operator returns a stream of ScanTask. The caller is
   /// responsible to dispatch/schedule said tasks. Tasks should be safe to run
   /// in a concurrent fashion and outlive the iterator.
@@ -188,6 +185,8 @@ class ARROW_DS_EXPORT Scanner {
 
  protected:
   std::shared_ptr<Dataset> dataset_;
+  // TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments
+  std::shared_ptr<Fragment> fragment_;
   std::shared_ptr<ScanOptions> scan_options_;
   std::shared_ptr<ScanContext> scan_context_;
 };
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 3970fea..556faa7 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -333,10 +333,43 @@ cdef class FileSystemDataset(Dataset):
         """The FileFormat of this source."""
         return FileFormat.wrap(self.filesystem_dataset.format())
 
+cdef shared_ptr[CExpression] _insert_implicit_casts(Expression filter,
+                                                    Schema schema) except *:
+    assert schema is not None
 
-def _empty_dataset_scanner(Schema schema not None, columns=None, filter=None):
-    dataset = UnionDataset(schema, children=[])
-    return Scanner(dataset, columns=columns, filter=filter)
+    if filter is None:
+        return ScalarExpression(True).unwrap()
+
+    return GetResultValue(
+        CInsertImplicitCasts(
+            deref(filter.unwrap().get()),
+            deref(pyarrow_unwrap_schema(schema).get())
+        )
+    )
+
+
+cdef shared_ptr[CScanOptions] _make_scan_options(
+        Schema schema, Expression partition_expression, object columns=None,
+        Expression filter=None) except *:
+    cdef:
+        shared_ptr[CScanOptions] options
+        CExpression* c_partition_expression
+
+    assert schema is not None
+    assert partition_expression is not None
+
+    filter = Expression.wrap(_insert_implicit_casts(filter, schema))
+    filter = filter.assume(partition_expression)
+
+    empty_dataset = UnionDataset(schema, children=[])
+    scanner = Scanner(empty_dataset, columns=columns, filter=filter)
+    options = scanner.unwrap().get().options()
+
+    c_partition_expression = partition_expression.unwrap().get()
+    check_status(CSetPartitionKeysInProjector(deref(c_partition_expression),
+                                              &options.get().projector))
+
+    return options
 
 
 cdef class FileFormat:
@@ -371,6 +404,7 @@ cdef class FileFormat:
         return self.wrapped
 
     def inspect(self, str path not None, FileSystem filesystem not None):
+        """Infer the schema of a file."""
         cdef:
             shared_ptr[CSchema] c_schema
 
@@ -381,16 +415,20 @@ cdef class FileFormat:
     def make_fragment(self, str path not None, FileSystem filesystem not None,
                       Schema schema=None, columns=None, filter=None,
                       Expression partition_expression=ScalarExpression(True)):
+        """
+        Make a FileFragment of this FileFormat. The filter may not reference
+        fields absent from the provided schema. If no schema is provided then
+        one will be inferred.
+        """
         cdef:
             shared_ptr[CScanOptions] c_options
             shared_ptr[CFileFragment] c_fragment
-            Scanner scanner
 
         if schema is None:
             schema = self.inspect(path, filesystem)
 
-        scanner = _empty_dataset_scanner(schema, columns, filter)
-        c_options = scanner.unwrap().get().options()
+        c_options = _make_scan_options(schema, partition_expression,
+                                       columns, filter)
 
         c_fragment = GetResultValue(
             self.format.MakeFragment(CFileSource(tobytes(path),
@@ -436,6 +474,14 @@ cdef class Fragment:
         self.init(sp)
         return self
 
+    cdef inline shared_ptr[CFragment] unwrap(self):
+        return self.wrapped
+
+    @property
+    def schema(self):
+        """Return the schema of batches scanned from this Fragment."""
+        return pyarrow_wrap_schema(self.fragment.schema())
+
     @property
     def partition_expression(self):
         """An Expression which evaluates to true for all data viewed by this
@@ -453,22 +499,8 @@ cdef class Fragment:
         -------
         table : Table
         """
-        cdef:
-            shared_ptr[CScanContext] context
-            shared_ptr[CScanOptions] options
-            CScanTaskIterator iterator
-            shared_ptr[CTable] table
-
-        options = self.fragment.scan_options()
-
-        context = make_shared[CScanContext]()
-        context.get().pool = maybe_unbox_memory_pool(memory_pool)
-
-        iterator = move(GetResultValue(self.fragment.Scan(context)))
-        table = GetResultValue(CScanTask.ToTable(options, context,
-                                                 move(iterator)))
-
-        return pyarrow_wrap_table(table)
+        scanner = Scanner._from_fragment(self, use_threads, memory_pool)
+        return scanner.to_table()
 
     def scan(self, MemoryPool memory_pool=None):
         """Returns a stream of ScanTasks
@@ -480,23 +512,7 @@ cdef class Fragment:
         -------
         scan_tasks : iterator of ScanTask
         """
-        cdef:
-            shared_ptr[CScanContext] context
-            CScanTaskIterator iterator
-            shared_ptr[CScanTask] task
-
-        # create scan context
-        context = make_shared[CScanContext]()
-        context.get().pool = maybe_unbox_memory_pool(memory_pool)
-
-        iterator = move(GetResultValue(self.fragment.Scan(move(context))))
-
-        while True:
-            task = GetResultValue(iterator.Next())
-            if task.get() == nullptr:
-                raise StopIteration()
-            else:
-                yield ScanTask.wrap(task)
+        return Scanner._from_fragment(self, memory_pool).scan()
 
 
 cdef class FileFragment(Fragment):
@@ -581,10 +597,7 @@ cdef class ParquetFileFragment(FileFragment):
             shared_ptr[CExpression] c_extra_filter
             shared_ptr[CFragment] c_fragment
 
-        if extra_filter is None:
-            extra_filter = ScalarExpression(True)
-        c_extra_filter = extra_filter.unwrap()
-
+        c_extra_filter = _insert_implicit_casts(extra_filter, self.schema)
         c_format = <CParquetFileFormat*> self.file_fragment.format().get()
         c_iterator = move(GetResultValue(c_format.GetRowGroupFragments(deref(
             self.parquet_file_fragment), move(c_extra_filter))))
@@ -694,7 +707,6 @@ cdef class ParquetFileFormat(FileFormat):
         cdef:
             shared_ptr[CScanOptions] c_options
             shared_ptr[CFileFragment] c_fragment
-            Scanner scanner
             vector[int] c_row_groups
 
         if row_groups is None:
@@ -706,8 +718,8 @@ cdef class ParquetFileFormat(FileFormat):
         if schema is None:
             schema = self.inspect(path, filesystem)
 
-        scanner = _empty_dataset_scanner(schema, columns, filter)
-        c_options = scanner.unwrap().get().options()
+        c_options = _make_scan_options(schema, partition_expression,
+                                       columns, filter)
 
         c_fragment = GetResultValue(
             self.parquet_format.MakeFragment(CFileSource(tobytes(path),
@@ -1314,6 +1326,8 @@ cdef class Scanner:
         # create scan context
         context = make_shared[CScanContext]()
         context.get().pool = maybe_unbox_memory_pool(memory_pool)
+        if use_threads is not None:
+            context.get().use_threads = use_threads
 
         # create scanner builder
         builder = GetResultValue(
@@ -1322,18 +1336,11 @@ cdef class Scanner:
 
         # set the builder's properties
         if columns is not None:
-            columns_to_project = [tobytes(c) for c in columns]
-            check_status(builder.get().Project(columns_to_project))
-        if filter is not None:
-            filter_expression = GetResultValue(
-                CInsertImplicitCasts(
-                    deref(filter.unwrap().get()),
-                    deref(builder.get().schema().get())
-                )
-            )
-            check_status(builder.get().Filter(filter_expression))
-        if use_threads is not None:
-            check_status(builder.get().UseThreads(use_threads))
+            check_status(builder.get().Project([tobytes(c) for c in columns]))
+
+        check_status(builder.get().Filter(_insert_implicit_casts(
+            filter, pyarrow_wrap_schema(builder.get().schema())
+        )))
 
         check_status(builder.get().BatchSize(batch_size))
 
@@ -1341,16 +1348,28 @@ cdef class Scanner:
         scanner = GetResultValue(builder.get().Finish())
         self.init(scanner)
 
-    cdef void init(self, shared_ptr[CScanner]& sp):
+    cdef void init(self, const shared_ptr[CScanner]& sp):
         self.wrapped = sp
         self.scanner = sp.get()
 
     @staticmethod
-    cdef wrap(shared_ptr[CScanner]& sp):
+    cdef wrap(const shared_ptr[CScanner]& sp):
         cdef Scanner self = Scanner.__new__(Scanner)
         self.init(sp)
         return self
 
+    @staticmethod
+    def _from_fragment(Fragment fragment not None, bint use_threads=True,
+                       MemoryPool memory_pool=None):
+        cdef:
+            shared_ptr[CScanContext] context
+
+        context = make_shared[CScanContext]()
+        context.get().pool = maybe_unbox_memory_pool(memory_pool)
+        context.get().use_threads = use_threads
+
+        return Scanner.wrap(make_shared[CScanner](fragment.unwrap(), context))
+
     cdef inline shared_ptr[CScanner] unwrap(self):
         return self.wrapped
 
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 7118f4d..467a84c 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -132,10 +132,14 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         "arrow::dataset::InsertImplicitCasts"(
             const CExpression &, const CSchema&)
 
-    cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
+    cdef cppclass CRecordBatchProjector "arrow::dataset::RecordBatchProjector":
         pass
 
+    cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
+        CRecordBatchProjector projector
+
     cdef cppclass CScanContext "arrow::dataset::ScanContext":
+        c_bool use_threads
         CMemoryPool * pool
 
     ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \
@@ -143,18 +147,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
 
     cdef cppclass CScanTask" arrow::dataset::ScanTask":
         CResult[CRecordBatchIterator] Execute()
-        @staticmethod
-        CResult[shared_ptr[CTable]] ToTable(
-            const shared_ptr[CScanOptions]&,
-            const shared_ptr[CScanContext]&,
-            CScanTaskIterator)
 
     cdef cppclass CFragment "arrow::dataset::Fragment":
-        const shared_ptr[CScanOptions]& scan_options() const
         CResult[CScanTaskIterator] Scan(shared_ptr[CScanContext] context)
-        c_bool splittable()
-        c_string type_name()
-        const shared_ptr[CExpression]& partition_expression()
+        const shared_ptr[CSchema]& schema() const
+        c_bool splittable() const
+        c_string type_name() const
+        const shared_ptr[CExpression]& partition_expression() const
 
     ctypedef vector[shared_ptr[CFragment]] CFragmentVector \
         "arrow::dataset::FragmentVector"
@@ -163,6 +162,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         "arrow::dataset::FragmentIterator"
 
     cdef cppclass CScanner "arrow::dataset::Scanner":
+        CScanner(shared_ptr[CFragment], shared_ptr[CScanContext])
         CResult[CScanTaskIterator] Scan()
         CResult[shared_ptr[CTable]] ToTable()
         CFragmentIterator GetFragments()
@@ -309,6 +309,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         shared_ptr[CPartitioning] partitioning() const
         shared_ptr[CPartitioningFactory] factory() const
 
+    cdef CStatus CSetPartitionKeysInProjector \
+        "arrow::dataset::KeyValuePartitioning::SetDefaultValuesFromKeys"(
+            const CExpression& partition_expression,
+            CRecordBatchProjector* projector)
+
     cdef cppclass CFileSystemFactoryOptions \
             "arrow::dataset::FileSystemFactoryOptions":
         CPartitioningOrFactory partitioning
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 9d46acf..037ba52 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -621,29 +621,170 @@ def test_make_fragment(multisourcefs):
         assert row_group_fragment.row_groups == {0}
 
 
-@pytest.mark.pandas
-def test_parquet_row_group_fragments(tempdir):
-    import pyarrow as pa
+def _create_dataset_for_fragments(tempdir, chunk_size=None):
     import pyarrow.parquet as pq
 
-    table = pa.table({'a': ['a', 'a', 'b', 'b'], 'b': [1, 2, 3, 4]})
-
+    table = pa.table(
+        {'f1': range(8), 'f2': [1] * 8, 'part': ['a'] * 4 + ['b'] * 4}
+    )
     # write_to_dataset currently requires pandas
     pq.write_to_dataset(table, str(tempdir / "test_parquet_dataset"),
-                        partition_cols=["a"])
+                        partition_cols=["part"], chunk_size=chunk_size)
 
-    import pyarrow.dataset as ds
     dataset = ds.dataset(str(tempdir / "test_parquet_dataset/"),
                          format="parquet", partitioning="hive")
+    return table, dataset
+
+
+@pytest.mark.pandas
+@pytest.mark.parquet
+def test_fragments(tempdir):
+    table, dataset = _create_dataset_for_fragments(tempdir)
 
+    # list fragments
     fragments = list(dataset.get_fragments())
+    assert len(fragments) == 2
     f = fragments[0]
-    parquet_format = f.format
-    parquet_format.make_fragment(f.path, f.filesystem,
-                                 partition_expression=f.partition_expression)
-    parquet_format.make_fragment(
-        f.path, f.filesystem, partition_expression=f.partition_expression,
+
+    # file's schema does not include partition column
+    phys_schema = f.schema.remove(f.schema.get_field_index('part'))
+    assert f.format.inspect(f.path, f.filesystem) == phys_schema
+    assert f.partition_expression.equals(ds.field('part') == 'a')
+
+    # scanning fragment includes partition columns
+    result = f.to_table()
+    assert f.schema == result.schema
+    assert result.column_names == ['f1', 'f2', 'part']
+    assert len(result) == 4
+    assert result.equals(table.slice(0, 4))
+
+    # scanning fragments follow column projection
+    fragments = list(dataset.get_fragments(columns=['f1', 'part']))
+    assert len(fragments) == 2
+    result = fragments[0].to_table()
+    assert result.column_names == ['f1', 'part']
+    assert len(result) == 4
+
+    # scanning fragments follow filter predicate
+    fragments = list(dataset.get_fragments(filter=ds.field('f1') < 2))
+    assert len(fragments) == 2
+    result = fragments[0].to_table()
+    assert result.column_names == ['f1', 'f2', 'part']
+    assert len(result) == 2
+    result = fragments[1].to_table()
+    assert len(result) == 0
+
+
+@pytest.mark.pandas
+@pytest.mark.parquet
+def test_fragments_reconstruct(tempdir):
+    table, dataset = _create_dataset_for_fragments(tempdir)
+
+    def assert_yields_projected(fragment, row_slice, columns):
+        actual = fragment.to_table()
+        assert actual.column_names == columns
+
+        expected = table.slice(*row_slice).to_pandas()[[*columns]]
+        assert actual.equals(pa.Table.from_pandas(expected))
+
+    fragment = list(dataset.get_fragments())[0]
+    parquet_format = fragment.format
+
+    # manually re-construct a fragment, with explicit schema
+    new_fragment = parquet_format.make_fragment(
+        fragment.path, fragment.filesystem, schema=dataset.schema,
+        partition_expression=fragment.partition_expression)
+    assert new_fragment.to_table().equals(fragment.to_table())
+    assert_yields_projected(new_fragment, (0, 4), table.column_names)
+
+    # filter / column projection, inspected schema
+    new_fragment = parquet_format.make_fragment(
+        fragment.path, fragment.filesystem,
+        columns=['f1'], filter=ds.field('f1') < 2,
+        partition_expression=fragment.partition_expression)
+    assert_yields_projected(new_fragment, (0, 2), ['f1'])
+
+    # filter requiring cast / column projection, inspected schema
+    new_fragment = parquet_format.make_fragment(
+        fragment.path, fragment.filesystem,
+        columns=['f1'], filter=ds.field('f1') < 2.0,
+        partition_expression=fragment.partition_expression)
+    assert_yields_projected(new_fragment, (0, 2), ['f1'])
+
+    # filter on the partition column, explicit schema
+    new_fragment = parquet_format.make_fragment(
+        fragment.path, fragment.filesystem, schema=dataset.schema,
+        filter=ds.field('part') == 'a',
+        partition_expression=fragment.partition_expression)
+    assert_yields_projected(new_fragment, (0, 4), table.column_names)
+
+    # filter on the partition column, inspected schema
+    with pytest.raises(ValueError, match="Field named 'part' not found"):
+        new_fragment = parquet_format.make_fragment(
+            fragment.path, fragment.filesystem,
+            filter=ds.field('part') == 'a',
+            partition_expression=fragment.partition_expression)
+
+
+@pytest.mark.pandas
+@pytest.mark.parquet
+def test_fragments_parquet_row_groups(tempdir):
+    table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2)
+
+    fragment = list(dataset.get_fragments())[0]
+
+    # list and scan row group fragments
+    row_group_fragments = list(fragment.get_row_group_fragments())
+    assert len(row_group_fragments) == 2
+    result = row_group_fragments[0].to_table()
+    assert result.column_names == ['f1', 'f2', 'part']
+    assert len(result) == 2
+    assert result.equals(table.slice(0, 2))
+
+    # scanning row group fragment follows column projection / filter predicate
+    fragment = list(dataset.get_fragments(
+        columns=['part', 'f1'], filter=ds.field('f1') < 1))[0]
+    row_group_fragments = list(fragment.get_row_group_fragments())
+    assert len(row_group_fragments) == 1
+    result = row_group_fragments[0].to_table()
+    assert result.column_names == ['part', 'f1']
+    assert len(result) == 1
+
+
+@pytest.mark.pandas
+@pytest.mark.parquet
+def test_fragments_parquet_row_groups_reconstruct(tempdir):
+    table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2)
+
+    fragment = list(dataset.get_fragments())[0]
+    parquet_format = fragment.format
+    row_group_fragments = list(fragment.get_row_group_fragments())
+
+    # manually re-construct row group fragments
+    new_fragment = parquet_format.make_fragment(
+        fragment.path, fragment.filesystem, schema=dataset.schema,
+        partition_expression=fragment.partition_expression,
+        row_groups=[0])
+    result = new_fragment.to_table()
+    assert result.equals(row_group_fragments[0].to_table())
+
+    # manually re-construct a row group fragment with filter/column projection
+    new_fragment = parquet_format.make_fragment(
+        fragment.path, fragment.filesystem, schema=dataset.schema,
+        columns=['f1', 'part'], filter=ds.field('f1') < 3,
+        partition_expression=fragment.partition_expression,
         row_groups={1})
+    result = new_fragment.to_table()
+    assert result.column_names == ['f1', 'part']
+    assert len(result) == 1
+
+    # out of bounds row group index
+    new_fragment = parquet_format.make_fragment(
+        fragment.path, fragment.filesystem,
+        partition_expression=fragment.partition_expression,
+        row_groups={2})
+    with pytest.raises(IndexError, match="trying to scan row group 2"):
+        new_fragment.to_table()
 
 
 def test_partitioning_factory(mockfs):