You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2020/06/26 03:07:00 UTC
[arrow] branch master updated: ARROW-8733: [C++][Dataset][Python]
Expose RowGroupInfo statistics values
This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 83fac7a ARROW-8733: [C++][Dataset][Python] Expose RowGroupInfo statistics values
83fac7a is described below
commit 83fac7aaf1cc2cd5b4e7c6229bb5b0aff6653086
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Thu Jun 25 23:06:30 2020 -0400
ARROW-8733: [C++][Dataset][Python] Expose RowGroupInfo statistics values
```python
stats = parquet_fragment.row_groups[0].statistics
assert stats == {
'normal_column': {'min': 1, 'max': 2},
'all_null_column': {'min': None, 'max': None},
'column_without_stats': None,
}
```
Closes #7546 from bkietz/8733-row-group-statistics
Authored-by: Benjamin Kietzman <be...@gmail.com>
Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
cpp/src/arrow/dataset/file_parquet.cc | 82 +++++++++++++++++++++-------
cpp/src/arrow/dataset/file_parquet.h | 20 +++++--
python/pyarrow/_dataset.pyx | 22 ++++++++
python/pyarrow/includes/libarrow.pxd | 3 +
python/pyarrow/includes/libarrow_dataset.pxd | 4 +-
python/pyarrow/tests/test_dataset.py | 6 ++
6 files changed, 110 insertions(+), 27 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 3eb4c2b..c02f084 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -124,22 +124,32 @@ static Result<SchemaManifest> GetSchemaManifest(
return manifest;
}
-static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
+static std::shared_ptr<StructScalar> MakeMinMaxScalar(std::shared_ptr<Scalar> min,
+ std::shared_ptr<Scalar> max) {
+ DCHECK(min->type->Equals(max->type));
+ return std::make_shared<StructScalar>(ScalarVector{min, max},
+ struct_({
+ field("min", min->type),
+ field("max", max->type),
+ }));
+}
+
+static std::shared_ptr<StructScalar> ColumnChunkStatisticsAsStructScalar(
const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) {
// For the remaining of this function, failure to extract/parse statistics
- // are ignored by returning the `true` scalar. The goal is two fold. First
- // avoid that an optimization break the computation. Second, allow the
+ // are ignored by returning nullptr. The goal is two fold. First
+ // avoid an optimization which breaks the computation. Second, allow the
// following columns to maybe succeed in extracting column statistics.
// For now, only leaf (primitive) types are supported.
if (!schema_field.is_leaf()) {
- return scalar(true);
+ return nullptr;
}
auto column_metadata = metadata.ColumnChunk(schema_field.column_index);
auto statistics = column_metadata->statistics();
if (statistics == nullptr) {
- return scalar(true);
+ return nullptr;
}
const auto& field = schema_field.field;
@@ -147,28 +157,31 @@ static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
// Optimize for corner case where all values are nulls
if (statistics->num_values() == statistics->null_count()) {
- return equal(field_expr, scalar(MakeNullScalar(field->type())));
+ auto null = MakeNullScalar(field->type());
+ return MakeMinMaxScalar(null, null);
}
std::shared_ptr<Scalar> min, max;
if (!StatisticsAsScalars(*statistics, &min, &max).ok()) {
- return scalar(true);
+ return nullptr;
}
- return and_(greater_equal(field_expr, scalar(min)),
- less_equal(field_expr, scalar(max)));
+ return MakeMinMaxScalar(std::move(min), std::move(max));
}
-static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+static std::shared_ptr<StructScalar> RowGroupStatisticsAsStructScalar(
const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) {
- const auto& fields = manifest.schema_fields;
- ExpressionVector expressions;
- expressions.reserve(fields.size());
- for (const auto& field : fields) {
- expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, metadata));
+ FieldVector fields;
+ ScalarVector statistics;
+ for (const auto& schema_field : manifest.schema_fields) {
+ if (auto min_max = ColumnChunkStatisticsAsStructScalar(schema_field, metadata)) {
+ fields.push_back(field(schema_field.field->name(), min_max->type));
+ statistics.push_back(std::move(min_max));
+ }
}
- return expressions.empty() ? scalar(true) : and_(expressions);
+ return std::make_shared<StructScalar>(std::move(statistics),
+ struct_(std::move(fields)));
}
class ParquetScanTaskIterator {
@@ -349,7 +362,7 @@ static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
if (!info.HasStatistics() && info.id() < num_row_groups) {
auto row_group = metadata->RowGroup(info.id());
info.set_num_rows(row_group->num_rows());
- info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+ info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group, manifest));
}
};
std::for_each(row_groups.begin(), row_groups.end(), augment);
@@ -444,8 +457,39 @@ std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
return result;
}
+void RowGroupInfo::SetStatisticsExpression() {
+ if (!HasStatistics()) {
+ statistics_expression_ = nullptr;
+ return;
+ }
+
+ if (statistics_->value.empty()) {
+ statistics_expression_ = scalar(true);
+ return;
+ }
+
+ ExpressionVector expressions{statistics_->value.size()};
+
+ for (size_t i = 0; i < expressions.size(); ++i) {
+ const auto& col_stats =
+ internal::checked_cast<const StructScalar&>(*statistics_->value[i]);
+ auto field_expr = field_ref(statistics_->type->field(static_cast<int>(i))->name());
+
+ DCHECK_EQ(col_stats.value.size(), 2);
+ const auto& min = col_stats.value[0];
+ const auto& max = col_stats.value[1];
+
+ DCHECK_EQ(min->is_valid, max->is_valid);
+ expressions[i] = min->is_valid ? and_(greater_equal(field_expr, scalar(min)),
+ less_equal(field_expr, scalar(max)))
+ : equal(std::move(field_expr), scalar(min));
+ }
+
+ statistics_expression_ = and_(std::move(expressions));
+}
+
bool RowGroupInfo::Satisfy(const Expression& predicate) const {
- return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
+ return !HasStatistics() || predicate.IsSatisfiableWith(statistics_expression_);
}
///
@@ -622,7 +666,7 @@ ParquetDatasetFactory::CollectParquetFragments(
auto row_group = metadata.RowGroup(i);
ARROW_ASSIGN_OR_RAISE(auto path,
FileFromRowGroup(filesystem_.get(), base_path_, *row_group));
- auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+ auto stats = RowGroupStatisticsAsStructScalar(*row_group, manifest);
auto num_rows = row_group->num_rows();
// Insert the path, or increase the count of row groups. It will be
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index f5fba7e..182280e 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -134,8 +134,10 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
/// \brief Construct a RowGroup from an identifier with statistics.
- RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
- : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+ RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<StructScalar> statistics)
+ : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {
+ SetStatisticsExpression();
+ }
/// \brief Transform a vector of identifiers into a vector of RowGroupInfos
static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
@@ -150,10 +152,13 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
int64_t num_rows() const { return num_rows_; }
void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
- /// \brief Return the RowGroup's statistics
- const std::shared_ptr<Expression>& statistics() const { return statistics_; }
- void set_statistics(std::shared_ptr<Expression> statistics) {
+ /// \brief Return the RowGroup's statistics as a StructScalar with a field for
+ /// each column with statistics.
+ /// Each field will also be a StructScalar with "min" and "max" fields.
+ const std::shared_ptr<StructScalar>& statistics() const { return statistics_; }
+ void set_statistics(std::shared_ptr<StructScalar> statistics) {
statistics_ = std::move(statistics);
+ SetStatisticsExpression();
}
/// \brief Indicate if statistics are set.
@@ -169,9 +174,12 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
private:
+ void SetStatisticsExpression();
+
int id_;
int64_t num_rows_;
- std::shared_ptr<Expression> statistics_;
+ std::shared_ptr<Expression> statistics_expression_;
+ std::shared_ptr<StructScalar> statistics_;
};
/// \brief A FileFragment with parquet logic.
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 5ffb11f..8047c5f 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -845,6 +845,28 @@ cdef class RowGroupInfo:
def num_rows(self):
return self.info.num_rows()
+ @property
+ def statistics(self):
+ if not self.info.HasStatistics():
+ return None
+
+ cdef:
+ CStructScalar* c_statistics
+ CStructScalar* c_minmax
+
+ statistics = dict()
+ c_statistics = self.info.statistics().get()
+ for i in range(c_statistics.value.size()):
+ name = frombytes(c_statistics.type.get().field(i).get().name())
+ c_minmax = <CStructScalar*> c_statistics.value[i].get()
+
+ statistics[name] = {
+ 'min': pyarrow_wrap_scalar(c_minmax.value[0]).as_py(),
+ 'max': pyarrow_wrap_scalar(c_minmax.value[1]).as_py(),
+ }
+
+ return statistics
+
def __eq__(self, other):
if not isinstance(other, RowGroupInfo):
return False
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index fc6ae62..7f416cb 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -886,6 +886,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef cppclass CStringScalar" arrow::StringScalar"(CScalar):
shared_ptr[CBuffer] value
+ cdef cppclass CStructScalar" arrow::StructScalar"(CScalar):
+ vector[shared_ptr[CScalar]] value
+
shared_ptr[CScalar] MakeScalar[Value](Value value)
shared_ptr[CScalar] MakeStringScalar" arrow::MakeScalar"(c_string value)
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index f80fdb9..6823bd9 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -216,11 +216,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CRowGroupInfo "arrow::dataset::RowGroupInfo":
CRowGroupInfo()
CRowGroupInfo(int id)
- CRowGroupInfo(
- int id, int64_t n_rows, shared_ptr[CExpression] statistics)
int id() const
int64_t num_rows() const
bint Equals(const CRowGroupInfo& other)
+ c_bool HasStatistics() const
+ shared_ptr[CStructScalar] statistics() const
cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
CFileFragment):
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 08fec4d..2a59f62 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -715,6 +715,12 @@ def test_fragments_parquet_row_groups(tempdir):
assert len(result) == 2
assert result.equals(table.slice(0, 2))
+ assert row_group_fragments[0].row_groups is not None
+ assert row_group_fragments[0].row_groups[0].statistics == {
+ 'f1': {'min': 0, 'max': 1},
+ 'f2': {'min': 1, 'max': 1},
+ }
+
fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0]
row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1))
assert len(row_group_fragments) == 1