You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2020/04/02 15:33:17 UTC
[arrow] branch master updated: ARROW-8276: [C++][Dataset] Use
Scanner for Fragment.to_table
This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9ad2cc4 ARROW-8276: [C++][Dataset] Use Scanner for Fragment.to_table
9ad2cc4 is described below
commit 9ad2cc4c8844217e7e04980c7e2aff97f3a895b3
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Thu Apr 2 11:32:59 2020 -0400
ARROW-8276: [C++][Dataset] Use Scanner for Fragment.to_table
This way batches yielded by Fragment.Scan are projected and filtered.
Closes #6765 from bkietz/8276-Scanning-a-Fragment-does-
Lead-authored-by: Benjamin Kietzman <be...@gmail.com>
Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
cpp/src/arrow/dataset/file_base.cc | 21 +---
cpp/src/arrow/dataset/partition.cc | 13 +++
cpp/src/arrow/dataset/partition.h | 3 +
cpp/src/arrow/dataset/scanner.cc | 64 +++++------
cpp/src/arrow/dataset/scanner.h | 15 ++-
python/pyarrow/_dataset.pyx | 139 ++++++++++++----------
python/pyarrow/includes/libarrow_dataset.pxd | 25 ++--
python/pyarrow/tests/test_dataset.py | 165 +++++++++++++++++++++++++--
8 files changed, 303 insertions(+), 142 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 75df308..66e0e1d 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -190,25 +190,8 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
}
// if possible, extract a partition key and pass it to the projector
- auto projector = &options[ref.i]->projector;
- {
- int index = -1;
- std::shared_ptr<Scalar> value_to_materialize;
-
- DCHECK_OK(KeyValuePartitioning::VisitKeys(
- *partition, [&](const std::string& name, const std::shared_ptr<Scalar>& value) {
- if (index != -1) return Status::OK();
-
- index = projector->schema()->GetFieldIndex(name);
- if (index != -1) value_to_materialize = value;
-
- return Status::OK();
- }));
-
- if (index != -1) {
- RETURN_NOT_OK(projector->SetDefaultValue(index, std::move(value_to_materialize)));
- }
- }
+ RETURN_NOT_OK(KeyValuePartitioning::SetDefaultValuesFromKeys(
+ *partition, &options[ref.i]->projector));
if (ref.info().IsFile()) {
// generate a fragment for this file
diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc
index ae64894..6ebd626 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -119,6 +119,19 @@ Status KeyValuePartitioning::VisitKeys(
checked_cast<const ScalarExpression*>(rhs)->value());
}
+Status KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr,
+ RecordBatchProjector* projector) {
+ return KeyValuePartitioning::VisitKeys(
+ expr, [projector](const std::string& name, const std::shared_ptr<Scalar>& value) {
+ ARROW_ASSIGN_OR_RAISE(auto match,
+ FieldRef(name).FindOneOrNone(*projector->schema()));
+ if (match.indices().empty()) {
+ return Status::OK();
+ }
+ return projector->SetDefaultValue(match, value);
+ });
+}
+
Result<std::shared_ptr<Expression>> KeyValuePartitioning::ConvertKey(
const Key& key, const Schema& schema) {
ARROW_ASSIGN_OR_RAISE(auto field, FieldRef(key.name).GetOneOrNone(schema));
diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h
index f8322a2..463d06c 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -159,6 +159,9 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
const std::function<Status(const std::string& name,
const std::shared_ptr<Scalar>& value)>& visitor);
+ static Status SetDefaultValuesFromKeys(const Expression& expr,
+ RecordBatchProjector* projector);
+
/// Convert a Key to a full expression.
/// If the field referenced in key is absent from the schema will be ignored.
static Result<std::shared_ptr<Expression>> ConvertKey(const Key& key,
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 4b0884e..346150e 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -60,43 +60,15 @@ std::vector<std::string> ScanOptions::MaterializedFields() const {
return fields;
}
-Result<std::shared_ptr<Table>> ScanTask::ToTable(
- const std::shared_ptr<ScanOptions>& options,
- const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_task_it) {
- std::mutex mutex;
- RecordBatchVector batches;
-
- auto task_group = context->TaskGroup();
-
- for (auto maybe_scan_task : scan_task_it) {
- ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
-
- task_group->Append([&batches, &mutex, scan_task] {
- ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
-
- for (auto maybe_batch : batch_it) {
- ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
- std::lock_guard<std::mutex> lock(mutex);
- batches.emplace_back(std::move(batch));
- }
-
- return Status::OK();
- });
- }
-
- // Wait for all tasks to complete, or the first error.
- RETURN_NOT_OK(task_group->Finish());
-
- std::shared_ptr<Table> out;
- RETURN_NOT_OK(Table::FromRecordBatches(options->schema(), batches, &out));
- return out;
-}
-
Result<RecordBatchIterator> InMemoryScanTask::Execute() {
return MakeVectorIterator(record_batches_);
}
FragmentIterator Scanner::GetFragments() {
+ if (fragment_ != nullptr) {
+ return MakeVectorIterator(FragmentVector{fragment_});
+ }
+
// Transform Datasets in a flat Iterator<Fragment>. This
// iterator is lazily constructed, i.e. Dataset::GetFragments is
// not invoked until a Fragment is requested.
@@ -186,7 +158,33 @@ std::shared_ptr<internal::TaskGroup> ScanContext::TaskGroup() const {
Result<std::shared_ptr<Table>> Scanner::ToTable() {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
- return ScanTask::ToTable(scan_options_, scan_context_, std::move(scan_task_it));
+ std::mutex mutex;
+ RecordBatchVector batches;
+
+ auto task_group = scan_context_->TaskGroup();
+
+ for (auto maybe_scan_task : scan_task_it) {
+ ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
+
+ task_group->Append([&batches, &mutex, scan_task] {
+ ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+
+ for (auto maybe_batch : batch_it) {
+ ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
+ std::lock_guard<std::mutex> lock(mutex);
+ batches.emplace_back(std::move(batch));
+ }
+
+ return Status::OK();
+ });
+ }
+
+ // Wait for all tasks to complete, or the first error.
+ RETURN_NOT_OK(task_group->Finish());
+
+ std::shared_ptr<Table> out;
+ RETURN_NOT_OK(Table::FromRecordBatches(scan_options_->schema(), batches, &out));
+ return out;
}
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 0304000..e94efd4 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -114,14 +114,6 @@ class ARROW_DS_EXPORT ScanTask {
const std::shared_ptr<ScanOptions>& options() const { return options_; }
const std::shared_ptr<ScanContext>& context() const { return context_; }
- /// \brief Convert a sequence of ScanTasks into a Table.
- ///
- /// Use this convenience utility with care. This will serially materialize the
- /// Scan result in memory before creating the Table.
- static Result<std::shared_ptr<Table>> ToTable(
- const std::shared_ptr<ScanOptions>& options,
- const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_tasks);
-
protected:
ScanTask(std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
: options_(std::move(options)), context_(std::move(context)) {}
@@ -166,6 +158,11 @@ class ARROW_DS_EXPORT Scanner {
scan_options_(std::move(scan_options)),
scan_context_(std::move(scan_context)) {}
+ Scanner(std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanContext> scan_context)
+ : fragment_(std::move(fragment)),
+ scan_options_(fragment_->scan_options()),
+ scan_context_(std::move(scan_context)) {}
+
/// \brief The Scan operator returns a stream of ScanTask. The caller is
/// responsible to dispatch/schedule said tasks. Tasks should be safe to run
/// in a concurrent fashion and outlive the iterator.
@@ -188,6 +185,8 @@ class ARROW_DS_EXPORT Scanner {
protected:
std::shared_ptr<Dataset> dataset_;
+ // TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments
+ std::shared_ptr<Fragment> fragment_;
std::shared_ptr<ScanOptions> scan_options_;
std::shared_ptr<ScanContext> scan_context_;
};
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 3970fea..556faa7 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -333,10 +333,43 @@ cdef class FileSystemDataset(Dataset):
"""The FileFormat of this source."""
return FileFormat.wrap(self.filesystem_dataset.format())
+cdef shared_ptr[CExpression] _insert_implicit_casts(Expression filter,
+ Schema schema) except *:
+ assert schema is not None
-def _empty_dataset_scanner(Schema schema not None, columns=None, filter=None):
- dataset = UnionDataset(schema, children=[])
- return Scanner(dataset, columns=columns, filter=filter)
+ if filter is None:
+ return ScalarExpression(True).unwrap()
+
+ return GetResultValue(
+ CInsertImplicitCasts(
+ deref(filter.unwrap().get()),
+ deref(pyarrow_unwrap_schema(schema).get())
+ )
+ )
+
+
+cdef shared_ptr[CScanOptions] _make_scan_options(
+ Schema schema, Expression partition_expression, object columns=None,
+ Expression filter=None) except *:
+ cdef:
+ shared_ptr[CScanOptions] options
+ CExpression* c_partition_expression
+
+ assert schema is not None
+ assert partition_expression is not None
+
+ filter = Expression.wrap(_insert_implicit_casts(filter, schema))
+ filter = filter.assume(partition_expression)
+
+ empty_dataset = UnionDataset(schema, children=[])
+ scanner = Scanner(empty_dataset, columns=columns, filter=filter)
+ options = scanner.unwrap().get().options()
+
+ c_partition_expression = partition_expression.unwrap().get()
+ check_status(CSetPartitionKeysInProjector(deref(c_partition_expression),
+ &options.get().projector))
+
+ return options
cdef class FileFormat:
@@ -371,6 +404,7 @@ cdef class FileFormat:
return self.wrapped
def inspect(self, str path not None, FileSystem filesystem not None):
+ """Infer the schema of a file."""
cdef:
shared_ptr[CSchema] c_schema
@@ -381,16 +415,20 @@ cdef class FileFormat:
def make_fragment(self, str path not None, FileSystem filesystem not None,
Schema schema=None, columns=None, filter=None,
Expression partition_expression=ScalarExpression(True)):
+ """
+ Make a FileFragment of this FileFormat. The filter may not reference
+ fields absent from the provided schema. If no schema is provided then
+ one will be inferred.
+ """
cdef:
shared_ptr[CScanOptions] c_options
shared_ptr[CFileFragment] c_fragment
- Scanner scanner
if schema is None:
schema = self.inspect(path, filesystem)
- scanner = _empty_dataset_scanner(schema, columns, filter)
- c_options = scanner.unwrap().get().options()
+ c_options = _make_scan_options(schema, partition_expression,
+ columns, filter)
c_fragment = GetResultValue(
self.format.MakeFragment(CFileSource(tobytes(path),
@@ -436,6 +474,14 @@ cdef class Fragment:
self.init(sp)
return self
+ cdef inline shared_ptr[CFragment] unwrap(self):
+ return self.wrapped
+
+ @property
+ def schema(self):
+ """Return the schema of batches scanned from this Fragment."""
+ return pyarrow_wrap_schema(self.fragment.schema())
+
@property
def partition_expression(self):
"""An Expression which evaluates to true for all data viewed by this
@@ -453,22 +499,8 @@ cdef class Fragment:
-------
table : Table
"""
- cdef:
- shared_ptr[CScanContext] context
- shared_ptr[CScanOptions] options
- CScanTaskIterator iterator
- shared_ptr[CTable] table
-
- options = self.fragment.scan_options()
-
- context = make_shared[CScanContext]()
- context.get().pool = maybe_unbox_memory_pool(memory_pool)
-
- iterator = move(GetResultValue(self.fragment.Scan(context)))
- table = GetResultValue(CScanTask.ToTable(options, context,
- move(iterator)))
-
- return pyarrow_wrap_table(table)
+ scanner = Scanner._from_fragment(self, use_threads, memory_pool)
+ return scanner.to_table()
def scan(self, MemoryPool memory_pool=None):
"""Returns a stream of ScanTasks
@@ -480,23 +512,7 @@ cdef class Fragment:
-------
scan_tasks : iterator of ScanTask
"""
- cdef:
- shared_ptr[CScanContext] context
- CScanTaskIterator iterator
- shared_ptr[CScanTask] task
-
- # create scan context
- context = make_shared[CScanContext]()
- context.get().pool = maybe_unbox_memory_pool(memory_pool)
-
- iterator = move(GetResultValue(self.fragment.Scan(move(context))))
-
- while True:
- task = GetResultValue(iterator.Next())
- if task.get() == nullptr:
- raise StopIteration()
- else:
- yield ScanTask.wrap(task)
+ return Scanner._from_fragment(self, memory_pool).scan()
cdef class FileFragment(Fragment):
@@ -581,10 +597,7 @@ cdef class ParquetFileFragment(FileFragment):
shared_ptr[CExpression] c_extra_filter
shared_ptr[CFragment] c_fragment
- if extra_filter is None:
- extra_filter = ScalarExpression(True)
- c_extra_filter = extra_filter.unwrap()
-
+ c_extra_filter = _insert_implicit_casts(extra_filter, self.schema)
c_format = <CParquetFileFormat*> self.file_fragment.format().get()
c_iterator = move(GetResultValue(c_format.GetRowGroupFragments(deref(
self.parquet_file_fragment), move(c_extra_filter))))
@@ -694,7 +707,6 @@ cdef class ParquetFileFormat(FileFormat):
cdef:
shared_ptr[CScanOptions] c_options
shared_ptr[CFileFragment] c_fragment
- Scanner scanner
vector[int] c_row_groups
if row_groups is None:
@@ -706,8 +718,8 @@ cdef class ParquetFileFormat(FileFormat):
if schema is None:
schema = self.inspect(path, filesystem)
- scanner = _empty_dataset_scanner(schema, columns, filter)
- c_options = scanner.unwrap().get().options()
+ c_options = _make_scan_options(schema, partition_expression,
+ columns, filter)
c_fragment = GetResultValue(
self.parquet_format.MakeFragment(CFileSource(tobytes(path),
@@ -1314,6 +1326,8 @@ cdef class Scanner:
# create scan context
context = make_shared[CScanContext]()
context.get().pool = maybe_unbox_memory_pool(memory_pool)
+ if use_threads is not None:
+ context.get().use_threads = use_threads
# create scanner builder
builder = GetResultValue(
@@ -1322,18 +1336,11 @@ cdef class Scanner:
# set the builder's properties
if columns is not None:
- columns_to_project = [tobytes(c) for c in columns]
- check_status(builder.get().Project(columns_to_project))
- if filter is not None:
- filter_expression = GetResultValue(
- CInsertImplicitCasts(
- deref(filter.unwrap().get()),
- deref(builder.get().schema().get())
- )
- )
- check_status(builder.get().Filter(filter_expression))
- if use_threads is not None:
- check_status(builder.get().UseThreads(use_threads))
+ check_status(builder.get().Project([tobytes(c) for c in columns]))
+
+ check_status(builder.get().Filter(_insert_implicit_casts(
+ filter, pyarrow_wrap_schema(builder.get().schema())
+ )))
check_status(builder.get().BatchSize(batch_size))
@@ -1341,16 +1348,28 @@ cdef class Scanner:
scanner = GetResultValue(builder.get().Finish())
self.init(scanner)
- cdef void init(self, shared_ptr[CScanner]& sp):
+ cdef void init(self, const shared_ptr[CScanner]& sp):
self.wrapped = sp
self.scanner = sp.get()
@staticmethod
- cdef wrap(shared_ptr[CScanner]& sp):
+ cdef wrap(const shared_ptr[CScanner]& sp):
cdef Scanner self = Scanner.__new__(Scanner)
self.init(sp)
return self
+ @staticmethod
+ def _from_fragment(Fragment fragment not None, bint use_threads=True,
+ MemoryPool memory_pool=None):
+ cdef:
+ shared_ptr[CScanContext] context
+
+ context = make_shared[CScanContext]()
+ context.get().pool = maybe_unbox_memory_pool(memory_pool)
+ context.get().use_threads = use_threads
+
+ return Scanner.wrap(make_shared[CScanner](fragment.unwrap(), context))
+
cdef inline shared_ptr[CScanner] unwrap(self):
return self.wrapped
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 7118f4d..467a84c 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -132,10 +132,14 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
"arrow::dataset::InsertImplicitCasts"(
const CExpression &, const CSchema&)
- cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
+ cdef cppclass CRecordBatchProjector "arrow::dataset::RecordBatchProjector":
pass
+ cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
+ CRecordBatchProjector projector
+
cdef cppclass CScanContext "arrow::dataset::ScanContext":
+ c_bool use_threads
CMemoryPool * pool
ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \
@@ -143,18 +147,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CScanTask" arrow::dataset::ScanTask":
CResult[CRecordBatchIterator] Execute()
- @staticmethod
- CResult[shared_ptr[CTable]] ToTable(
- const shared_ptr[CScanOptions]&,
- const shared_ptr[CScanContext]&,
- CScanTaskIterator)
cdef cppclass CFragment "arrow::dataset::Fragment":
- const shared_ptr[CScanOptions]& scan_options() const
CResult[CScanTaskIterator] Scan(shared_ptr[CScanContext] context)
- c_bool splittable()
- c_string type_name()
- const shared_ptr[CExpression]& partition_expression()
+ const shared_ptr[CSchema]& schema() const
+ c_bool splittable() const
+ c_string type_name() const
+ const shared_ptr[CExpression]& partition_expression() const
ctypedef vector[shared_ptr[CFragment]] CFragmentVector \
"arrow::dataset::FragmentVector"
@@ -163,6 +162,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
"arrow::dataset::FragmentIterator"
cdef cppclass CScanner "arrow::dataset::Scanner":
+ CScanner(shared_ptr[CFragment], shared_ptr[CScanContext])
CResult[CScanTaskIterator] Scan()
CResult[shared_ptr[CTable]] ToTable()
CFragmentIterator GetFragments()
@@ -309,6 +309,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
shared_ptr[CPartitioning] partitioning() const
shared_ptr[CPartitioningFactory] factory() const
+ cdef CStatus CSetPartitionKeysInProjector \
+ "arrow::dataset::KeyValuePartitioning::SetDefaultValuesFromKeys"(
+ const CExpression& partition_expression,
+ CRecordBatchProjector* projector)
+
cdef cppclass CFileSystemFactoryOptions \
"arrow::dataset::FileSystemFactoryOptions":
CPartitioningOrFactory partitioning
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 9d46acf..037ba52 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -621,29 +621,170 @@ def test_make_fragment(multisourcefs):
assert row_group_fragment.row_groups == {0}
-@pytest.mark.pandas
-def test_parquet_row_group_fragments(tempdir):
- import pyarrow as pa
+def _create_dataset_for_fragments(tempdir, chunk_size=None):
import pyarrow.parquet as pq
- table = pa.table({'a': ['a', 'a', 'b', 'b'], 'b': [1, 2, 3, 4]})
-
+ table = pa.table(
+ {'f1': range(8), 'f2': [1] * 8, 'part': ['a'] * 4 + ['b'] * 4}
+ )
# write_to_dataset currently requires pandas
pq.write_to_dataset(table, str(tempdir / "test_parquet_dataset"),
- partition_cols=["a"])
+ partition_cols=["part"], chunk_size=chunk_size)
- import pyarrow.dataset as ds
dataset = ds.dataset(str(tempdir / "test_parquet_dataset/"),
format="parquet", partitioning="hive")
+ return table, dataset
+
+
+@pytest.mark.pandas
+@pytest.mark.parquet
+def test_fragments(tempdir):
+ table, dataset = _create_dataset_for_fragments(tempdir)
+ # list fragments
fragments = list(dataset.get_fragments())
+ assert len(fragments) == 2
f = fragments[0]
- parquet_format = f.format
- parquet_format.make_fragment(f.path, f.filesystem,
- partition_expression=f.partition_expression)
- parquet_format.make_fragment(
- f.path, f.filesystem, partition_expression=f.partition_expression,
+
+ # file's schema does not include partition column
+ phys_schema = f.schema.remove(f.schema.get_field_index('part'))
+ assert f.format.inspect(f.path, f.filesystem) == phys_schema
+ assert f.partition_expression.equals(ds.field('part') == 'a')
+
+ # scanning fragment includes partition columns
+ result = f.to_table()
+ assert f.schema == result.schema
+ assert result.column_names == ['f1', 'f2', 'part']
+ assert len(result) == 4
+ assert result.equals(table.slice(0, 4))
+
+ # scanning fragments follow column projection
+ fragments = list(dataset.get_fragments(columns=['f1', 'part']))
+ assert len(fragments) == 2
+ result = fragments[0].to_table()
+ assert result.column_names == ['f1', 'part']
+ assert len(result) == 4
+
+ # scanning fragments follow filter predicate
+ fragments = list(dataset.get_fragments(filter=ds.field('f1') < 2))
+ assert len(fragments) == 2
+ result = fragments[0].to_table()
+ assert result.column_names == ['f1', 'f2', 'part']
+ assert len(result) == 2
+ result = fragments[1].to_table()
+ assert len(result) == 0
+
+
+@pytest.mark.pandas
+@pytest.mark.parquet
+def test_fragments_reconstruct(tempdir):
+ table, dataset = _create_dataset_for_fragments(tempdir)
+
+ def assert_yields_projected(fragment, row_slice, columns):
+ actual = fragment.to_table()
+ assert actual.column_names == columns
+
+ expected = table.slice(*row_slice).to_pandas()[[*columns]]
+ assert actual.equals(pa.Table.from_pandas(expected))
+
+ fragment = list(dataset.get_fragments())[0]
+ parquet_format = fragment.format
+
+ # manually re-construct a fragment, with explicit schema
+ new_fragment = parquet_format.make_fragment(
+ fragment.path, fragment.filesystem, schema=dataset.schema,
+ partition_expression=fragment.partition_expression)
+ assert new_fragment.to_table().equals(fragment.to_table())
+ assert_yields_projected(new_fragment, (0, 4), table.column_names)
+
+ # filter / column projection, inspected schema
+ new_fragment = parquet_format.make_fragment(
+ fragment.path, fragment.filesystem,
+ columns=['f1'], filter=ds.field('f1') < 2,
+ partition_expression=fragment.partition_expression)
+ assert_yields_projected(new_fragment, (0, 2), ['f1'])
+
+ # filter requiring cast / column projection, inspected schema
+ new_fragment = parquet_format.make_fragment(
+ fragment.path, fragment.filesystem,
+ columns=['f1'], filter=ds.field('f1') < 2.0,
+ partition_expression=fragment.partition_expression)
+ assert_yields_projected(new_fragment, (0, 2), ['f1'])
+
+ # filter on the partition column, explicit schema
+ new_fragment = parquet_format.make_fragment(
+ fragment.path, fragment.filesystem, schema=dataset.schema,
+ filter=ds.field('part') == 'a',
+ partition_expression=fragment.partition_expression)
+ assert_yields_projected(new_fragment, (0, 4), table.column_names)
+
+ # filter on the partition column, inspected schema
+ with pytest.raises(ValueError, match="Field named 'part' not found"):
+ new_fragment = parquet_format.make_fragment(
+ fragment.path, fragment.filesystem,
+ filter=ds.field('part') == 'a',
+ partition_expression=fragment.partition_expression)
+
+
+@pytest.mark.pandas
+@pytest.mark.parquet
+def test_fragments_parquet_row_groups(tempdir):
+ table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2)
+
+ fragment = list(dataset.get_fragments())[0]
+
+ # list and scan row group fragments
+ row_group_fragments = list(fragment.get_row_group_fragments())
+ assert len(row_group_fragments) == 2
+ result = row_group_fragments[0].to_table()
+ assert result.column_names == ['f1', 'f2', 'part']
+ assert len(result) == 2
+ assert result.equals(table.slice(0, 2))
+
+ # scanning row group fragment follows column projection / filter predicate
+ fragment = list(dataset.get_fragments(
+ columns=['part', 'f1'], filter=ds.field('f1') < 1))[0]
+ row_group_fragments = list(fragment.get_row_group_fragments())
+ assert len(row_group_fragments) == 1
+ result = row_group_fragments[0].to_table()
+ assert result.column_names == ['part', 'f1']
+ assert len(result) == 1
+
+
+@pytest.mark.pandas
+@pytest.mark.parquet
+def test_fragments_parquet_row_groups_reconstruct(tempdir):
+ table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2)
+
+ fragment = list(dataset.get_fragments())[0]
+ parquet_format = fragment.format
+ row_group_fragments = list(fragment.get_row_group_fragments())
+
+ # manually re-construct row group fragments
+ new_fragment = parquet_format.make_fragment(
+ fragment.path, fragment.filesystem, schema=dataset.schema,
+ partition_expression=fragment.partition_expression,
+ row_groups=[0])
+ result = new_fragment.to_table()
+ assert result.equals(row_group_fragments[0].to_table())
+
+ # manually re-construct a row group fragment with filter/column projection
+ new_fragment = parquet_format.make_fragment(
+ fragment.path, fragment.filesystem, schema=dataset.schema,
+ columns=['f1', 'part'], filter=ds.field('f1') < 3,
+ partition_expression=fragment.partition_expression,
row_groups={1})
+ result = new_fragment.to_table()
+ assert result.column_names == ['f1', 'part']
+ assert len(result) == 1
+
+ # out of bounds row group index
+ new_fragment = parquet_format.make_fragment(
+ fragment.path, fragment.filesystem,
+ partition_expression=fragment.partition_expression,
+ row_groups={2})
+ with pytest.raises(IndexError, match="trying to scan row group 2"):
+ new_fragment.to_table()
def test_partitioning_factory(mockfs):