You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2020/03/27 16:52:57 UTC
[arrow] branch master updated: ARROW-8061: [C++][Dataset] Provide
RowGroup fragments for ParquetFileFormat
This is an automated email from the ASF dual-hosted git repository.
fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2ca1706 ARROW-8061: [C++][Dataset] Provide RowGroup fragments for ParquetFileFormat
2ca1706 is described below
commit 2ca1706625eb73b75e004f87eaf6bc038f2af813
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Fri Mar 27 12:52:33 2020 -0400
ARROW-8061: [C++][Dataset] Provide RowGroup fragments for ParquetFileFormat
Provides ParquetFileFragment, which may view a subset of row groups within a parquet file. The indices of viewed row groups are available through the `row_groups()` property which is exposed to python. Construction of subset-viewing ParquetFileFragments is not yet exposed to python.
Closes #6670 from bkietz/8061-Ability-to-specify-granul
Authored-by: Benjamin Kietzman <be...@gmail.com>
Signed-off-by: François Saint-Jacques <fs...@gmail.com>
---
cpp/src/arrow/dataset/file_base.cc | 4 +-
cpp/src/arrow/dataset/file_base.h | 11 +-
cpp/src/arrow/dataset/file_parquet.cc | 103 +++++++++++--
cpp/src/arrow/dataset/file_parquet.h | 49 ++++++
cpp/src/arrow/dataset/file_parquet_test.cc | 142 ++++++++++++++++--
cpp/src/arrow/dataset/filter.cc | 2 +
cpp/src/arrow/dataset/scanner.cc | 92 +++++-------
cpp/src/arrow/dataset/scanner.h | 8 +
cpp/src/arrow/dataset/scanner_internal.h | 29 +++-
cpp/src/arrow/dataset/test_util.h | 2 +-
cpp/src/arrow/dataset/type_fwd.h | 9 ++
cpp/src/arrow/filesystem/filesystem.h | 2 +-
python/pyarrow/_dataset.pyx | 214 ++++++++++++++++++++++++++-
python/pyarrow/dataset.py | 1 +
python/pyarrow/includes/libarrow_dataset.pxd | 44 +++++-
python/pyarrow/includes/libarrow_fs.pxd | 1 +
python/pyarrow/tests/test_dataset.py | 111 ++++++++++----
17 files changed, 683 insertions(+), 141 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 1da3dd7..75df308 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -63,8 +63,8 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<Expression> partition_expression) {
- return std::make_shared<FileFragment>(std::move(source), shared_from_this(), options,
- std::move(partition_expression));
+ return std::shared_ptr<FileFragment>(new FileFragment(
+ std::move(source), shared_from_this(), options, std::move(partition_expression)));
}
Result<std::shared_ptr<WriteTask>> FileFormat::WriteFragment(
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 8571eca..157a425 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -162,19 +162,13 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
public:
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;
- // XXX should this include format_->type_name?
- std::string type_name() const override { return "file"; }
+ std::string type_name() const override { return format_->type_name(); }
bool splittable() const override { return format_->splittable(); }
const FileSource& source() const { return source_; }
const std::shared_ptr<FileFormat>& format() const { return format_; }
- FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
- std::shared_ptr<ScanOptions> scan_options)
- : Fragment(std::move(scan_options)),
- source_(std::move(source)),
- format_(std::move(format)) {}
-
+ protected:
FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
@@ -182,7 +176,6 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
source_(std::move(source)),
format_(std::move(format)) {}
- protected:
FileSource source_;
std::shared_ptr<FileFormat> format_;
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index aa87691..107f9c4 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -183,26 +183,29 @@ class RowGroupSkipper {
RowGroupSkipper(std::shared_ptr<parquet::FileMetaData> metadata,
parquet::ArrowReaderProperties arrow_properties,
- std::shared_ptr<Expression> filter)
+ std::shared_ptr<Expression> filter, std::vector<int> row_groups)
: metadata_(std::move(metadata)),
arrow_properties_(std::move(arrow_properties)),
filter_(std::move(filter)),
- row_group_idx_(0) {
- num_row_groups_ = metadata_->num_row_groups();
- }
+ row_group_idx_(0),
+ row_groups_(std::move(row_groups)),
+ num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups()
+ : static_cast<int>(row_groups_.size())) {}
int Next() {
while (row_group_idx_ < num_row_groups_) {
- const auto row_group_idx = row_group_idx_++;
- const auto row_group = metadata_->RowGroup(row_group_idx);
+ const int row_group =
+ row_groups_.empty() ? row_group_idx_++ : row_groups_[row_group_idx_++];
+
+ const auto row_group_metadata = metadata_->RowGroup(row_group);
- const auto num_rows = row_group->num_rows();
- if (CanSkip(*row_group)) {
+ const int64_t num_rows = row_group_metadata->num_rows();
+ if (CanSkip(*row_group_metadata)) {
rows_skipped_ += num_rows;
continue;
}
- return row_group_idx;
+ return row_group;
}
return kIterationDone;
@@ -225,6 +228,7 @@ class RowGroupSkipper {
parquet::ArrowReaderProperties arrow_properties_;
std::shared_ptr<Expression> filter_;
int row_group_idx_;
+ std::vector<int> row_groups_;
int num_row_groups_;
int64_t rows_skipped_;
};
@@ -234,7 +238,8 @@ class ParquetScanTaskIterator {
static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context,
std::unique_ptr<parquet::ParquetFileReader> reader,
- parquet::ArrowReaderProperties arrow_properties) {
+ parquet::ArrowReaderProperties arrow_properties,
+ const std::vector<int>& row_groups) {
auto metadata = reader->metadata();
auto column_projection = InferColumnProjection(*metadata, arrow_properties, options);
@@ -244,7 +249,7 @@ class ParquetScanTaskIterator {
arrow_properties, &arrow_reader));
RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
- options->filter);
+ options->filter, row_groups);
return ScanTaskIterator(ParquetScanTaskIterator(
std::move(options), std::move(context), std::move(column_projection),
@@ -373,12 +378,86 @@ Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(
Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
const FileSource& source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const {
+ return ScanFile(source, std::move(options), std::move(context), {});
+}
+
+Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
+ const FileSource& source, std::shared_ptr<ScanOptions> options,
+ std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
auto properties = MakeReaderProperties(*this, context->pool);
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));
+ for (int i : row_groups) {
+ if (i >= reader->metadata()->num_row_groups()) {
+ return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
+ " only has ", reader->metadata()->num_row_groups(),
+ " row groups");
+ }
+ }
+
auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
- std::move(reader), std::move(arrow_properties));
+ std::move(reader), std::move(arrow_properties),
+ row_groups);
+}
+
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+ FileSource source, std::shared_ptr<ScanOptions> options,
+ std::shared_ptr<Expression> partition_expression, std::vector<int> row_groups) {
+ return std::shared_ptr<FileFragment>(
+ new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options),
+ std::move(partition_expression), std::move(row_groups)));
+}
+
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+ FileSource source, std::shared_ptr<ScanOptions> options,
+ std::shared_ptr<Expression> partition_expression) {
+ return std::shared_ptr<FileFragment>(
+ new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options),
+ std::move(partition_expression), {}));
+}
+
+Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
+ const ParquetFileFragment& fragment, std::shared_ptr<Expression> extra_filter) {
+ auto properties = MakeReaderProperties(*this);
+ ARROW_ASSIGN_OR_RAISE(auto reader,
+ OpenReader(fragment.source(), std::move(properties)));
+
+ auto arrow_properties =
+ MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
+ auto metadata = reader->metadata();
+
+ auto row_groups = fragment.row_groups();
+ if (row_groups.empty()) {
+ row_groups = internal::Iota(metadata->num_row_groups());
+ }
+ FragmentVector fragments(row_groups.size());
+
+ auto new_options = std::make_shared<ScanOptions>(*fragment.scan_options());
+ if (!extra_filter->Equals(true)) {
+ new_options->filter = and_(std::move(extra_filter), std::move(new_options->filter));
+ }
+
+ RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
+ new_options->filter, std::move(row_groups));
+
+ for (int i = 0, row_group = skipper.Next();
+ row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
+ ARROW_ASSIGN_OR_RAISE(fragments[i++],
+ MakeFragment(fragment.source(), new_options,
+ fragment.partition_expression(), {row_group}));
+ }
+
+ return MakeVectorIterator(std::move(fragments));
+}
+
+Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanContext> context) {
+ return parquet_format().ScanFile(source_, scan_options_, std::move(context),
+ row_groups_);
+}
+
+const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
+ return internal::checked_cast<const ParquetFileFormat&>(*format_);
}
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index e270c78..3192d0d 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -21,6 +21,7 @@
#include <string>
#include <unordered_set>
#include <utility>
+#include <vector>
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
@@ -86,6 +87,54 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
Result<ScanTaskIterator> ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const override;
+
+ /// \brief Open a file for scanning, restricted to the specified row groups.
+ Result<ScanTaskIterator> ScanFile(const FileSource& source,
+ std::shared_ptr<ScanOptions> options,
+ std::shared_ptr<ScanContext> context,
+ const std::vector<int>& row_groups) const;
+
+ using FileFormat::MakeFragment;
+
+ Result<std::shared_ptr<FileFragment>> MakeFragment(
+ FileSource source, std::shared_ptr<ScanOptions> options,
+ std::shared_ptr<Expression> partition_expression) override;
+
+ /// \brief Create a Fragment, restricted to the specified row groups.
+ Result<std::shared_ptr<FileFragment>> MakeFragment(
+ FileSource source, std::shared_ptr<ScanOptions> options,
+ std::shared_ptr<Expression> partition_expression, std::vector<int> row_groups);
+
+ /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+ /// Row groups whose metadata contradicts the fragment's filter or the extra_filter
+ /// will be excluded.
+ Result<FragmentIterator> GetRowGroupFragments(
+ const ParquetFileFragment& fragment,
+ std::shared_ptr<Expression> extra_filter = scalar(true));
+};
+
+class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
+ public:
+ Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;
+
+ /// \brief The row groups viewed by this Fragment. This may be empty which signifies all
+ /// row groups are selected.
+ const std::vector<int>& row_groups() const { return row_groups_; }
+
+ private:
+ ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
+ std::shared_ptr<ScanOptions> scan_options,
+ std::shared_ptr<Expression> partition_expression,
+ std::vector<int> row_groups)
+ : FileFragment(std::move(source), std::move(format), std::move(scan_options),
+ std::move(partition_expression)),
+ row_groups_(std::move(row_groups)) {}
+
+ const ParquetFileFormat& parquet_format() const;
+
+ std::vector<int> row_groups_;
+
+ friend class ParquetFileFormat;
};
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 9a3dcad..9a7c8fb 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -29,7 +29,9 @@
#include "arrow/testing/util.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
+#include "arrow/util/range.h"
#include "parquet/arrow/writer.h"
+#include "parquet/metadata.h"
namespace arrow {
namespace dataset {
@@ -50,6 +52,8 @@ using parquet::arrow::WriteTable;
using testing::Pointee;
+using internal::checked_pointer_cast;
+
class ArrowParquetWriterMixin : public ::testing::Test {
public:
Status WriteRecordBatch(const RecordBatch& batch, FileWriter* writer) {
@@ -149,6 +153,12 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin {
return Batches(std::move(scan_task_it));
}
+ std::shared_ptr<RecordBatch> SingleBatch(Fragment* fragment) {
+ auto batches = IteratorToVector(Batches(fragment));
+ EXPECT_EQ(batches.size(), 1);
+ return batches.front();
+ }
+
void CountRowsAndBatchesInScan(Fragment* fragment, int64_t expected_rows,
int64_t expected_batches) {
int64_t actual_rows = 0;
@@ -164,6 +174,34 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin {
EXPECT_EQ(actual_batches, expected_batches);
}
+ void CountRowsAndBatchesInScan(const std::shared_ptr<Fragment>& fragment,
+ int64_t expected_rows, int64_t expected_batches) {
+ return CountRowsAndBatchesInScan(fragment.get(), expected_rows, expected_batches);
+ }
+
+ void CountRowGroupsInFragment(const std::shared_ptr<Fragment>& fragment,
+ std::vector<int> expected_row_groups,
+ const Expression& filter,
+ const Expression& extra_filter = *scalar(true)) {
+ fragment->scan_options()->filter = filter.Copy();
+
+ auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
+ ASSERT_OK_AND_ASSIGN(
+ auto row_group_fragments,
+ format_->GetRowGroupFragments(*parquet_fragment, extra_filter.Copy()));
+
+ auto expected_row_group = expected_row_groups.begin();
+ for (auto maybe_fragment : row_group_fragments) {
+ ASSERT_OK_AND_ASSIGN(auto fragment, std::move(maybe_fragment));
+ auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
+
+ auto i = *expected_row_group++;
+ EXPECT_EQ(parquet_fragment->row_groups(), std::vector<int>{i});
+
+ EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), i + 1);
+ }
+ }
+
protected:
std::shared_ptr<Schema> schema_ = schema({field("f64", float64())});
std::shared_ptr<ParquetFileFormat> format_ = std::make_shared<ParquetFileFormat>();
@@ -369,34 +407,118 @@ TEST_F(TestParquetFileFormat, PredicatePushdown) {
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_));
opts_->filter = scalar(true);
- CountRowsAndBatchesInScan(fragment.get(), kTotalNumRows, kNumRowGroups);
+ CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups);
for (int64_t i = 1; i <= kNumRowGroups; i++) {
opts_->filter = ("i64"_ == int64_t(i)).Copy();
- CountRowsAndBatchesInScan(fragment.get(), i, 1);
+ CountRowsAndBatchesInScan(fragment, i, 1);
}
- /* Out of bound filters should skip all RowGroups. */
+ // Out of bound filters should skip all RowGroups.
opts_->filter = scalar(false);
- CountRowsAndBatchesInScan(fragment.get(), 0, 0);
+ CountRowsAndBatchesInScan(fragment, 0, 0);
opts_->filter = ("i64"_ == int64_t(kNumRowGroups + 1)).Copy();
- CountRowsAndBatchesInScan(fragment.get(), 0, 0);
+ CountRowsAndBatchesInScan(fragment, 0, 0);
opts_->filter = ("i64"_ == int64_t(-1)).Copy();
- CountRowsAndBatchesInScan(fragment.get(), 0, 0);
+ CountRowsAndBatchesInScan(fragment, 0, 0);
// No rows match 1 and 2.
opts_->filter = ("i64"_ == int64_t(1) and "u8"_ == uint8_t(2)).Copy();
- CountRowsAndBatchesInScan(fragment.get(), 0, 0);
+ CountRowsAndBatchesInScan(fragment, 0, 0);
opts_->filter = ("i64"_ == int64_t(2) or "i64"_ == int64_t(4)).Copy();
- CountRowsAndBatchesInScan(fragment.get(), 2 + 4, 2);
+ CountRowsAndBatchesInScan(fragment, 2 + 4, 2);
opts_->filter = ("i64"_ < int64_t(6)).Copy();
- CountRowsAndBatchesInScan(fragment.get(), 5 * (5 + 1) / 2, 5);
+ CountRowsAndBatchesInScan(fragment, 5 * (5 + 1) / 2, 5);
opts_->filter = ("i64"_ >= int64_t(6)).Copy();
- CountRowsAndBatchesInScan(fragment.get(), kTotalNumRows - (5 * (5 + 1) / 2),
+ CountRowsAndBatchesInScan(fragment, kTotalNumRows - (5 * (5 + 1) / 2),
kNumRowGroups - 5);
}
+TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) {
+ constexpr int64_t kNumRowGroups = 16;
+ constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
+
+ auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
+ auto source = GetFileSource(reader.get());
+
+ opts_ = ScanOptions::Make(reader->schema());
+ ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_));
+
+ CountRowGroupsInFragment(fragment, internal::Iota(static_cast<int>(kTotalNumRows)),
+ *scalar(true));
+
+ for (int i = 0; i < kNumRowGroups; ++i) {
+ CountRowGroupsInFragment(fragment, {i}, "i64"_ == int64_t(i + 1));
+ }
+
+ // Out of bound filters should skip all RowGroups.
+ CountRowGroupsInFragment(fragment, {}, *scalar(false));
+ CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(kNumRowGroups + 1));
+ CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(-1));
+
+ // No rows match 1 and 2.
+ CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(1) and "u8"_ == uint8_t(2));
+ CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(2), "i64"_ == int64_t(4));
+
+ CountRowGroupsInFragment(fragment, {1, 3},
+ "i64"_ == int64_t(2) or "i64"_ == int64_t(4));
+
+ CountRowGroupsInFragment(fragment, {0, 1, 2, 3, 4}, "i64"_ < int64_t(6));
+
+ CountRowGroupsInFragment(fragment, internal::Iota(5, static_cast<int>(kNumRowGroups)),
+ "i64"_ >= int64_t(6));
+
+ CountRowGroupsInFragment(fragment, {5, 6, 7}, "i64"_ >= int64_t(6),
+ "i64"_ < int64_t(8));
+}
+
+TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) {
+ constexpr int64_t kNumRowGroups = 16;
+ constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
+
+ auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
+ auto source = GetFileSource(reader.get());
+
+ opts_ = ScanOptions::Make(reader->schema());
+
+ auto row_groups_fragment = [&](std::vector<int> row_groups) {
+ EXPECT_OK_AND_ASSIGN(auto fragment,
+ format_->MakeFragment(*source, opts_, scalar(true), row_groups));
+ return internal::checked_pointer_cast<ParquetFileFragment>(fragment);
+ };
+
+ // empty selection is identical to selecting all row groups
+ EXPECT_TRUE(row_groups_fragment({})->row_groups().empty());
+ CountRowsAndBatchesInScan(row_groups_fragment({}), kTotalNumRows, kNumRowGroups);
+
+ // individual selection selects a single row group
+ for (int i = 0; i < kNumRowGroups; ++i) {
+ CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1);
+ EXPECT_EQ(row_groups_fragment({i})->row_groups(), std::vector<int>{i});
+ }
+
+ for (int i = 0; i < kNumRowGroups; ++i) {
+ // conflicting selection/filter
+ opts_->filter = ("i64"_ == int64_t(i)).Copy();
+ CountRowsAndBatchesInScan(row_groups_fragment({i}), 0, 0);
+ }
+
+ for (int i = 0; i < kNumRowGroups; ++i) {
+ // identical selection/filter
+ opts_->filter = ("i64"_ == int64_t(i + 1)).Copy();
+ CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1);
+ }
+
+ opts_->filter = ("i64"_ > int64_t(3)).Copy();
+ CountRowsAndBatchesInScan(row_groups_fragment({2, 3, 4, 5}), 4 + 5 + 6, 3);
+
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ IndexError,
+ testing::HasSubstr("only has " + std::to_string(kNumRowGroups) + " row groups"),
+ row_groups_fragment({kNumRowGroups + 1})->Scan(ctx_));
+}
+
} // namespace dataset
} // namespace arrow
diff --git a/cpp/src/arrow/dataset/filter.cc b/cpp/src/arrow/dataset/filter.cc
index 5004e5a..ca5c3a7 100644
--- a/cpp/src/arrow/dataset/filter.cc
+++ b/cpp/src/arrow/dataset/filter.cc
@@ -1260,5 +1260,7 @@ Result<std::shared_ptr<RecordBatch>> TreeEvaluator::Filter(
return batch->Slice(0, 0);
}
+std::shared_ptr<ScalarExpression> scalar(bool value) { return scalar(MakeScalar(value)); }
+
} // namespace dataset
} // namespace arrow
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 31a1380..4b0884e 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -60,24 +60,40 @@ std::vector<std::string> ScanOptions::MaterializedFields() const {
return fields;
}
-Result<RecordBatchIterator> InMemoryScanTask::Execute() {
- return MakeVectorIterator(record_batches_);
-}
+Result<std::shared_ptr<Table>> ScanTask::ToTable(
+ const std::shared_ptr<ScanOptions>& options,
+ const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_task_it) {
+ std::mutex mutex;
+ RecordBatchVector batches;
-/// \brief GetScanTaskIterator transforms an Iterator<Fragment> in a
-/// flattened Iterator<ScanTask>.
-static ScanTaskIterator GetScanTaskIterator(FragmentIterator fragments,
- std::shared_ptr<ScanContext> context) {
- // Fragment -> ScanTaskIterator
- auto fn = [context](std::shared_ptr<Fragment> fragment) {
- return fragment->Scan(context);
- };
+ auto task_group = context->TaskGroup();
+
+ for (auto maybe_scan_task : scan_task_it) {
+ ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
+
+ task_group->Append([&batches, &mutex, scan_task] {
+ ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+
+ for (auto maybe_batch : batch_it) {
+ ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
+ std::lock_guard<std::mutex> lock(mutex);
+ batches.emplace_back(std::move(batch));
+ }
- // Iterator<Iterator<ScanTask>>
- auto maybe_scantask_it = MakeMaybeMapIterator(fn, std::move(fragments));
+ return Status::OK();
+ });
+ }
+
+ // Wait for all tasks to complete, or the first error.
+ RETURN_NOT_OK(task_group->Finish());
- // Iterator<ScanTask>
- return MakeFlattenIterator(std::move(maybe_scantask_it));
+ std::shared_ptr<Table> out;
+ RETURN_NOT_OK(Table::FromRecordBatches(options->schema(), batches, &out));
+ return out;
+}
+
+Result<RecordBatchIterator> InMemoryScanTask::Execute() {
+ return MakeVectorIterator(record_batches_);
}
FragmentIterator Scanner::GetFragments() {
@@ -168,51 +184,9 @@ std::shared_ptr<internal::TaskGroup> ScanContext::TaskGroup() const {
return internal::TaskGroup::MakeSerial();
}
-struct TableAggregator {
- void Append(std::shared_ptr<RecordBatch> batch) {
- std::lock_guard<std::mutex> lock(m);
- batches.emplace_back(std::move(batch));
- }
-
- Result<std::shared_ptr<Table>> Finish(const std::shared_ptr<Schema>& schema) {
- std::shared_ptr<Table> out;
- RETURN_NOT_OK(Table::FromRecordBatches(schema, batches, &out));
- return out;
- }
-
- std::mutex m;
- std::vector<std::shared_ptr<RecordBatch>> batches;
-};
-
-struct ScanTaskPromise {
- Status operator()() {
- ARROW_ASSIGN_OR_RAISE(auto it, task->Execute());
- for (auto maybe_batch : it) {
- ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
- aggregator.Append(std::move(batch));
- }
-
- return Status::OK();
- }
-
- TableAggregator& aggregator;
- std::shared_ptr<ScanTask> task;
-};
-
Result<std::shared_ptr<Table>> Scanner::ToTable() {
- auto task_group = scan_context_->TaskGroup();
-
- TableAggregator aggregator;
- ARROW_ASSIGN_OR_RAISE(auto it, Scan());
- for (auto maybe_scan_task : it) {
- ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
- task_group->Append(ScanTaskPromise{aggregator, std::move(scan_task)});
- }
-
- // Wait for all tasks to complete, or the first error.
- RETURN_NOT_OK(task_group->Finish());
-
- return aggregator.Finish(scan_options_->schema());
+ ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
+ return ScanTask::ToTable(scan_options_, scan_context_, std::move(scan_task_it));
}
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 05b8e43..0304000 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -114,6 +114,14 @@ class ARROW_DS_EXPORT ScanTask {
const std::shared_ptr<ScanOptions>& options() const { return options_; }
const std::shared_ptr<ScanContext>& context() const { return context_; }
+ /// \brief Convert a sequence of ScanTasks into a Table.
+ ///
+ /// Use this convenience utility with care. This will serially materialize the
+ /// Scan result in memory before creating the Table.
+ static Result<std::shared_ptr<Table>> ToTable(
+ const std::shared_ptr<ScanOptions>& options,
+ const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_tasks);
+
protected:
ScanTask(std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
: options_(std::move(options)), context_(std::move(context)) {}
diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h
index d299fa9..0cbd4a5 100644
--- a/cpp/src/arrow/dataset/scanner_internal.h
+++ b/cpp/src/arrow/dataset/scanner_internal.h
@@ -27,10 +27,9 @@
namespace arrow {
namespace dataset {
-static inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it,
- const ExpressionEvaluator& evaluator,
- const Expression& filter,
- MemoryPool* pool) {
+inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it,
+ const ExpressionEvaluator& evaluator,
+ const Expression& filter, MemoryPool* pool) {
return MakeMaybeMapIterator(
[&filter, &evaluator, pool](std::shared_ptr<RecordBatch> in) {
return evaluator.Evaluate(filter, *in, pool).Map([&](compute::Datum selection) {
@@ -40,9 +39,9 @@ static inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it,
std::move(it));
}
-static inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it,
- RecordBatchProjector* projector,
- MemoryPool* pool) {
+inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it,
+ RecordBatchProjector* projector,
+ MemoryPool* pool) {
return MakeMaybeMapIterator(
[=](std::shared_ptr<RecordBatch> in) {
// The RecordBatchProjector is shared accross ScanTasks of the same
@@ -71,5 +70,21 @@ class FilterAndProjectScanTask : public ScanTask {
std::shared_ptr<ScanTask> task_;
};
+/// \brief GetScanTaskIterator transforms an Iterator<Fragment> in a
+/// flattened Iterator<ScanTask>.
+inline ScanTaskIterator GetScanTaskIterator(FragmentIterator fragments,
+ std::shared_ptr<ScanContext> context) {
+ // Fragment -> ScanTaskIterator
+ auto fn = [context](std::shared_ptr<Fragment> fragment) {
+ return fragment->Scan(context);
+ };
+
+ // Iterator<Iterator<ScanTask>>
+ auto maybe_scantask_it = MakeMaybeMapIterator(fn, std::move(fragments));
+
+ // Iterator<ScanTask>
+ return MakeFlattenIterator(std::move(maybe_scantask_it));
+}
+
} // namespace dataset
} // namespace arrow
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 7e33a2f..b7feaef 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -316,7 +316,7 @@ struct MakeFileSystemDatasetMixin {
static const std::string& PathOf(const std::shared_ptr<Fragment>& fragment) {
EXPECT_NE(fragment, nullptr);
- EXPECT_EQ(fragment->type_name(), "file");
+ EXPECT_THAT(fragment->type_name(), "dummy");
return internal::checked_cast<const FileFragment&>(*fragment).source().path();
}
diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h
index 141ede8..118b94d 100644
--- a/cpp/src/arrow/dataset/type_fwd.h
+++ b/cpp/src/arrow/dataset/type_fwd.h
@@ -53,6 +53,11 @@ class FileFormat;
class FileFragment;
class FileSystemDataset;
+class ParquetFileFormat;
+class ParquetFileFragment;
+
+class IpcFileFormat;
+
class Expression;
using ExpressionVector = std::vector<std::shared_ptr<Expression>>;
@@ -67,6 +72,10 @@ class ScalarExpression;
class FieldReferenceExpression;
class ExpressionEvaluator;
+/// forward declared to facilitate scalar(true) as a default for Expression parameters
+ARROW_DS_EXPORT
+std::shared_ptr<ScalarExpression> scalar(bool);
+
class Partitioning;
class PartitioningFactory;
class PartitioningOrFactory;
diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h
index f4a2d32..fae8a40 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -156,7 +156,7 @@ struct ARROW_EXPORT FileSelector {
};
/// \brief Abstract file system API
-class ARROW_EXPORT FileSystem {
+class ARROW_EXPORT FileSystem : public std::enable_shared_from_this<FileSystem> {
public:
virtual ~FileSystem();
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index fd5aac5..de55002 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -27,8 +27,6 @@ from pyarrow.lib cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.compat import frombytes, tobytes
from pyarrow._fs cimport FileSystem, FileInfo, FileSelector
-from pyarrow.types import (is_null, is_boolean, is_integer, is_floating,
- is_string)
def _forbid_instantiation(klass, subclasses_instead=True):
@@ -336,6 +334,11 @@ cdef class FileSystemDataset(Dataset):
return FileFormat.wrap(self.filesystem_dataset.format())
+def _empty_dataset_scanner(Schema schema not None, columns=None, filter=None):
+ dataset = UnionDataset(schema, children=[])
+ return Scanner(dataset, columns=columns, filter=filter)
+
+
cdef class FileFormat:
cdef:
@@ -367,6 +370,35 @@ cdef class FileFormat:
cdef inline shared_ptr[CFileFormat] unwrap(self):
return self.wrapped
+ def inspect(self, str path not None, FileSystem filesystem not None):
+ cdef:
+ shared_ptr[CSchema] c_schema
+
+ c_schema = GetResultValue(self.format.Inspect(CFileSource(
+ tobytes(path), filesystem.unwrap().get())))
+ return pyarrow_wrap_schema(move(c_schema))
+
+ def make_fragment(self, str path not None, FileSystem filesystem not None,
+ Schema schema=None, columns=None, filter=None,
+ Expression partition_expression=ScalarExpression(True)):
+ cdef:
+ shared_ptr[CScanOptions] c_options
+ shared_ptr[CFileFragment] c_fragment
+ Scanner scanner
+
+ if schema is None:
+ schema = self.inspect(path, filesystem)
+
+ scanner = _empty_dataset_scanner(schema, columns, filter)
+ c_options = scanner.unwrap().get().options()
+
+ c_fragment = GetResultValue(
+ self.format.MakeFragment(CFileSource(tobytes(path),
+ filesystem.unwrap().get()),
+ move(c_options),
+ partition_expression.unwrap()))
+ return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+
cdef class Fragment:
"""Fragment of data from a Dataset."""
@@ -386,8 +418,12 @@ cdef class Fragment:
cdef Fragment self = Fragment()
typ = frombytes(sp.get().type_name())
- if typ == 'file':
+ if typ == 'ipc':
+ # IpcFileFormat does not have a corresponding subclass
+ # of FileFragment
self = FileFragment.__new__(FileFragment)
+ elif typ == 'parquet':
+ self = ParquetFileFragment.__new__(ParquetFileFragment)
else:
self = Fragment()
@@ -396,12 +432,66 @@ cdef class Fragment:
@property
def partition_expression(self):
- """
- An Expression which evaluates to true for all data viewed by this
+ """An Expression which evaluates to true for all data viewed by this
Fragment.
"""
return Expression.wrap(self.fragment.partition_expression())
+ def to_table(self, use_threads=True, MemoryPool memory_pool=None):
+ """Convert this Fragment into a Table.
+
+ Use this convenience utility with care. This will serially materialize
+ the Scan result in memory before creating the Table.
+
+ Returns
+ -------
+ table : Table
+ """
+ cdef:
+ shared_ptr[CScanContext] context
+ shared_ptr[CScanOptions] options
+ CScanTaskIterator iterator
+ shared_ptr[CTable] table
+
+ options = self.fragment.scan_options()
+
+ context = make_shared[CScanContext]()
+ context.get().pool = maybe_unbox_memory_pool(memory_pool)
+
+ iterator = move(GetResultValue(self.fragment.Scan(context)))
+ table = GetResultValue(CScanTask.ToTable(options, context,
+ move(iterator)))
+
+ return pyarrow_wrap_table(table)
+
+ def scan(self, MemoryPool memory_pool=None):
+ """Returns a stream of ScanTasks
+
+ The caller is responsible to dispatch/schedule said tasks. Tasks should
+ be safe to run in a concurrent fashion and outlive the iterator.
+
+ Returns
+ -------
+ scan_tasks : iterator of ScanTask
+ """
+ cdef:
+ shared_ptr[CScanContext] context
+ CScanTaskIterator iterator
+ shared_ptr[CScanTask] task
+
+ # create scan context
+ context = make_shared[CScanContext]()
+ context.get().pool = maybe_unbox_memory_pool(memory_pool)
+
+ iterator = move(GetResultValue(self.fragment.Scan(move(context))))
+
+ while True:
+ task = GetResultValue(iterator.Next())
+ if task.get() == nullptr:
+ raise StopIteration()
+ else:
+ yield ScanTask.wrap(task)
+
cdef class FileFragment(Fragment):
"""A Fragment representing a data file."""
@@ -416,11 +506,38 @@ cdef class FileFragment(Fragment):
@property
def path(self):
"""
- The path of the data file viewed by this fragment.
+ The path of the data file viewed by this fragment, if it views a
+ file. If instead it views a buffer, this will be "<Buffer>".
"""
return frombytes(self.file_fragment.source().path())
@property
+ def filesystem(self):
+ """
+ The FileSystem containing the data file viewed by this fragment, if
+ it views a file. If instead it views a buffer, this will be None.
+ """
+ cdef:
+ shared_ptr[CFileSystem] fs
+ fs = self.file_fragment.source().filesystem().shared_from_this()
+ return FileSystem.wrap(fs)
+
+ @property
+ def buffer(self):
+ """
+ The buffer viewed by this fragment, if it views a buffer. If
+ instead it views a file, this will be None.
+ """
+ cdef:
+ shared_ptr[CBuffer] c_buffer
+ c_buffer = self.file_fragment.source().buffer()
+
+ if c_buffer.get() == nullptr:
+ return None
+
+ return pyarrow_wrap_buffer(c_buffer)
+
+ @property
def format(self):
"""
The format of the data file viewed by this fragment.
@@ -428,6 +545,52 @@ cdef class FileFragment(Fragment):
return FileFormat.wrap(self.file_fragment.format())
+cdef class ParquetFileFragment(FileFragment):
+ """A Fragment representing a parquet file."""
+
+ cdef:
+ CParquetFileFragment* parquet_file_fragment
+
+ cdef void init(self, const shared_ptr[CFragment]& sp):
+ FileFragment.init(self, sp)
+ self.parquet_file_fragment = <CParquetFileFragment*> sp.get()
+
+ @property
+ def row_groups(self):
+ row_groups = set(self.parquet_file_fragment.row_groups())
+ if len(row_groups) != 0:
+ return row_groups
+ return None
+
+ def get_row_group_fragments(self, Expression extra_filter=None):
+ """
+ Yield a Fragment wrapping each row group in this ParquetFileFragment.
+ Row groups will be excluded whose metadata contradicts the either the
+ filter provided on construction of this Fragment or the extra_filter
+ argument.
+ """
+ cdef:
+ CParquetFileFormat* c_format
+ CFragmentIterator c_iterator
+ shared_ptr[CExpression] c_extra_filter
+ shared_ptr[CFragment] c_fragment
+
+ if extra_filter is None:
+ extra_filter = ScalarExpression(True)
+ c_extra_filter = extra_filter.unwrap()
+
+ c_format = <CParquetFileFormat*> self.file_fragment.format().get()
+ c_iterator = move(GetResultValue(c_format.GetRowGroupFragments(deref(
+ self.parquet_file_fragment), move(c_extra_filter))))
+
+ while True:
+ c_fragment = GetResultValue(c_iterator.Next())
+ if c_fragment.get() == nullptr:
+ raise StopIteration()
+ else:
+ yield Fragment.wrap(c_fragment)
+
+
cdef class ParquetFileFormatReaderOptions:
cdef:
CParquetFileFormatReaderOptions* options
@@ -474,14 +637,48 @@ cdef class ParquetFileFormat(FileFormat):
def __init__(self, dict reader_options=dict()):
self.init(<shared_ptr[CFileFormat]> make_shared[CParquetFileFormat]())
- self.parquet_format = <CParquetFileFormat*> self.wrapped.get()
for name, value in reader_options.items():
setattr(self.reader_options, name, value)
+ cdef void init(self, const shared_ptr[CFileFormat]& sp):
+ FileFormat.init(self, sp)
+ self.parquet_format = <CParquetFileFormat*> self.wrapped.get()
+
@property
def reader_options(self):
return ParquetFileFormatReaderOptions(self)
+ def make_fragment(self, str path not None, FileSystem filesystem not None,
+ Schema schema=None, columns=None, filter=None,
+ Expression partition_expression=ScalarExpression(True),
+ row_groups=None):
+ cdef:
+ shared_ptr[CScanOptions] c_options
+ shared_ptr[CFileFragment] c_fragment
+ Scanner scanner
+ vector[int] c_row_groups
+
+ if row_groups is None:
+ return super().make_fragment(path, filesystem, schema, columns,
+ filter, partition_expression)
+ for row_group in set(row_groups):
+ c_row_groups.push_back(<int> row_group)
+
+ if schema is None:
+ schema = self.inspect(path, filesystem)
+
+ scanner = _empty_dataset_scanner(schema, columns, filter)
+ c_options = scanner.unwrap().get().options()
+
+ c_fragment = GetResultValue(
+ self.parquet_format.MakeFragment(CFileSource(tobytes(path),
+ filesystem.unwrap()
+ .get()),
+ move(c_options),
+ partition_expression.unwrap(),
+ move(c_row_groups)))
+ return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+
cdef class IpcFileFormat(FileFormat):
@@ -1109,6 +1306,9 @@ cdef class Scanner:
self.init(sp)
return self
+ cdef inline shared_ptr[CScanner] unwrap(self):
+ return self.wrapped
+
def scan(self):
"""Returns a stream of ScanTasks
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 5e087ad..21f46e3 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -43,6 +43,7 @@ from pyarrow._dataset import ( # noqa
NotExpression,
OrExpression,
ParquetFileFormat,
+ ParquetFileFragment,
Partitioning,
PartitioningFactory,
ScalarExpression,
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 6229c50..7118f4d 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -132,16 +132,25 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
"arrow::dataset::InsertImplicitCasts"(
const CExpression &, const CSchema&)
+ cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
+ pass
+
cdef cppclass CScanContext "arrow::dataset::ScanContext":
CMemoryPool * pool
- cdef cppclass CScanTask" arrow::dataset::ScanTask":
- CResult[CRecordBatchIterator] Execute()
-
ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \
"arrow::dataset::ScanTaskIterator"
+ cdef cppclass CScanTask" arrow::dataset::ScanTask":
+ CResult[CRecordBatchIterator] Execute()
+ @staticmethod
+ CResult[shared_ptr[CTable]] ToTable(
+ const shared_ptr[CScanOptions]&,
+ const shared_ptr[CScanContext]&,
+ CScanTaskIterator)
+
cdef cppclass CFragment "arrow::dataset::Fragment":
+ const shared_ptr[CScanOptions]& scan_options() const
CResult[CScanTaskIterator] Scan(shared_ptr[CScanContext] context)
c_bool splittable()
c_string type_name()
@@ -157,6 +166,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CResult[CScanTaskIterator] Scan()
CResult[shared_ptr[CTable]] ToTable()
CFragmentIterator GetFragments()
+ const shared_ptr[CScanOptions]& options()
cdef cppclass CScannerBuilder "arrow::dataset::ScannerBuilder":
CScannerBuilder(shared_ptr[CDataset],
@@ -210,15 +220,27 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
vector[shared_ptr[CDatasetFactory]] factories)
cdef cppclass CFileSource "arrow::dataset::FileSource":
- const c_string& path()
+ const c_string& path() const
+ CFileSystem* filesystem() const
+ const shared_ptr[CBuffer]& buffer() const
+ CFileSource(c_string path, CFileSystem* filesystem)
cdef cppclass CFileFormat "arrow::dataset::FileFormat":
- c_string type_name()
+ c_string type_name() const
+ CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const
+ CResult[shared_ptr[CFileFragment]] MakeFragment(
+ CFileSource source,
+ shared_ptr[CScanOptions] options,
+ shared_ptr[CExpression] partition_expression)
cdef cppclass CFileFragment "arrow::dataset::FileFragment"(
CFragment):
- const CFileSource& source()
- shared_ptr[CFileFormat] format()
+ const CFileSource& source() const
+ const shared_ptr[CFileFormat]& format() const
+
+ cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
+ CFileFragment):
+ const vector[int]& row_groups() const
cdef cppclass CFileSystemDataset \
"arrow::dataset::FileSystemDataset"(CDataset):
@@ -243,6 +265,14 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
CFileFormat):
CParquetFileFormatReaderOptions reader_options
+ CResult[CFragmentIterator] GetRowGroupFragments(
+ const CParquetFileFragment&,
+ shared_ptr[CExpression] extra_filter)
+ CResult[shared_ptr[CFileFragment]] MakeFragment(
+ CFileSource source,
+ shared_ptr[CScanOptions] options,
+ shared_ptr[CExpression] partition_expression,
+ vector[int] row_groups)
cdef cppclass CIpcFileFormat "arrow::dataset::IpcFileFormat"(
CFileFormat):
diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd
index f3d46ed..824d830 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -59,6 +59,7 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
c_bool recursive
cdef cppclass CFileSystem "arrow::fs::FileSystem":
+ shared_ptr[CFileSystem] shared_from_this()
c_string type_name() const
CResult[c_string] NormalizePath(c_string path)
CResult[CFileInfo] GetFileInfo(const c_string& path)
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index cf62b32..05ef3ff 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -85,19 +85,6 @@ def mockfs(request):
mockfs = fs._MockFileSystem()
- data = [
- list(range(5)),
- list(map(float, range(5))),
- list(map(str, range(5)))
- ]
- schema = pa.schema([
- pa.field('i64', pa.int64()),
- pa.field('f64', pa.float64()),
- pa.field('str', pa.string())
- ])
- batch = pa.record_batch(data, schema=schema)
- table = pa.Table.from_batches([batch])
-
directories = [
'subdir/1/xxx',
'subdir/2/yyy',
@@ -107,6 +94,21 @@ def mockfs(request):
path = '{}/file{}.parquet'.format(directory, i)
mockfs.create_dir(directory)
with mockfs.open_output_stream(path) as out:
+ data = [
+ list(range(5)),
+ list(map(float, range(5))),
+ list(map(str, range(5))),
+ [i] * 5
+ ]
+ schema = pa.schema([
+ pa.field('i64', pa.int64()),
+ pa.field('f64', pa.float64()),
+ pa.field('str', pa.string()),
+ pa.field('const', pa.int64()),
+ ])
+ batch = pa.record_batch(data, schema=schema)
+ table = pa.Table.from_batches([batch])
+
pq.write_table(table, out)
return mockfs
@@ -179,7 +181,9 @@ def dataset(mockfs):
def test_filesystem_dataset(mockfs):
- schema = pa.schema([])
+ schema = pa.schema([
+ pa.field('const', pa.int64())
+ ])
file_format = ds.ParquetFileFormat()
@@ -225,12 +229,24 @@ def test_filesystem_dataset(mockfs):
assert set(dataset.files) == set(paths)
fragments = list(dataset.get_fragments())
- assert fragments[0].partition_expression.equals(
- ds.AndExpression(root_partition, partitions[0]))
- assert fragments[1].partition_expression.equals(
- ds.AndExpression(root_partition, partitions[1]))
- assert fragments[0].path == paths[0]
- assert fragments[1].path == paths[1]
+ for fragment, partition, path in zip(fragments, partitions, paths):
+ assert fragment.partition_expression.equals(
+ ds.AndExpression(root_partition, partition))
+ assert fragment.path == path
+ assert isinstance(fragment, ds.ParquetFileFragment)
+ assert fragment.row_groups is None
+
+ row_group_fragments = list(fragment.get_row_group_fragments())
+ assert len(row_group_fragments) == 1
+ assert isinstance(fragment, ds.ParquetFileFragment)
+ assert row_group_fragments[0].path == path
+ assert row_group_fragments[0].row_groups == {0}
+
+ # test predicate pushdown using row group metadata
+ fragments = list(dataset.get_fragments(filter=ds.field("const") == 0))
+ assert len(fragments) == 2
+ assert len(list(fragments[0].get_row_group_fragments())) == 1
+ assert len(list(fragments[1].get_row_group_fragments())) == 0
def test_dataset(dataset):
@@ -505,6 +521,7 @@ def test_filesystem_factory(mockfs, paths_or_selector):
pa.field('i64', pa.int64()),
pa.field('f64', pa.float64()),
pa.field('str', pa.dictionary(pa.int32(), pa.string())),
+ pa.field('const', pa.int64()),
pa.field('group', pa.int32()),
pa.field('key', pa.string()),
]), check_metadata=False)
@@ -525,20 +542,61 @@ def test_filesystem_factory(mockfs, paths_or_selector):
pa.array([0, 1, 2, 3, 4], type=pa.int32()),
pa.array("0 1 2 3 4".split(), type=pa.string()))
for task, group, key in zip(scanner.scan(), [1, 2], ['xxx', 'yyy']):
- expected_group_column = pa.array([group] * 5, type=pa.int32())
- expected_key_column = pa.array([key] * 5, type=pa.string())
+ expected_group = pa.array([group] * 5, type=pa.int32())
+ expected_key = pa.array([key] * 5, type=pa.string())
+ expected_const = pa.array([group - 1] * 5, type=pa.int64())
for batch in task.execute():
- assert batch.num_columns == 5
+ assert batch.num_columns == 6
assert batch[0].equals(expected_i64)
assert batch[1].equals(expected_f64)
assert batch[2].equals(expected_str)
- assert batch[3].equals(expected_group_column)
- assert batch[4].equals(expected_key_column)
+ assert batch[3].equals(expected_const)
+ assert batch[4].equals(expected_group)
+ assert batch[5].equals(expected_key)
table = dataset.to_table()
assert isinstance(table, pa.Table)
assert len(table) == 10
- assert table.num_columns == 5
+ assert table.num_columns == 6
+
+
+def test_make_fragment(multisourcefs):
+ parquet_format = ds.ParquetFileFormat()
+ dataset = ds.dataset('/plain', filesystem=multisourcefs,
+ format=parquet_format)
+
+ for path in dataset.files:
+ fragment = parquet_format.make_fragment(path, multisourcefs)
+ row_group_fragment = parquet_format.make_fragment(path, multisourcefs,
+ row_groups=[0])
+ for f in [fragment, row_group_fragment]:
+ assert isinstance(f, ds.ParquetFileFragment)
+ assert f.path == path
+ assert isinstance(f.filesystem, type(multisourcefs))
+ assert fragment.row_groups is None
+ assert row_group_fragment.row_groups == {0}
+
+
+def test_parquet_row_group_fragments(tempdir):
+ import pyarrow as pa
+ import pyarrow.parquet as pq
+
+ table = pa.table({'a': ['a', 'a', 'b', 'b'], 'b': [1, 2, 3, 4]})
+ pq.write_to_dataset(table, str(tempdir / "test_parquet_dataset"),
+ partition_cols=["a"])
+
+ import pyarrow.dataset as ds
+ dataset = ds.dataset(str(tempdir / "test_parquet_dataset/"),
+ format="parquet", partitioning="hive")
+
+ fragments = list(dataset.get_fragments())
+ f = fragments[0]
+ parquet_format = f.format
+ parquet_format.make_fragment(f.path, f.filesystem,
+ partition_expression=f.partition_expression)
+ parquet_format.make_fragment(
+ f.path, f.filesystem, partition_expression=f.partition_expression,
+ row_groups={1})
def test_partitioning_factory(mockfs):
@@ -559,6 +617,7 @@ def test_partitioning_factory(mockfs):
("i64", pa.int64()),
("f64", pa.float64()),
("str", pa.string()),
+ ("const", pa.int64()),
("group", pa.int32()),
("key", pa.string()),
])