You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2020/06/26 03:07:00 UTC

[arrow] branch master updated: ARROW-8733: [C++][Dataset][Python] Expose RowGroupInfo statistics values

This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 83fac7a  ARROW-8733: [C++][Dataset][Python] Expose RowGroupInfo statistics values
83fac7a is described below

commit 83fac7aaf1cc2cd5b4e7c6229bb5b0aff6653086
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Thu Jun 25 23:06:30 2020 -0400

    ARROW-8733: [C++][Dataset][Python] Expose RowGroupInfo statistics values
    
    ```python
    stats = parquet_fragment.row_groups[0].statistics
    assert stats == {
      'normal_column': {'min': 1, 'max': 2},
      'all_null_column': {'min': None, 'max': None},
      'column_without_stats': None,
    }
    ```
    
    Closes #7546 from bkietz/8733-row-group-statistics
    
    Authored-by: Benjamin Kietzman <be...@gmail.com>
    Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
 cpp/src/arrow/dataset/file_parquet.cc        | 82 +++++++++++++++++++++-------
 cpp/src/arrow/dataset/file_parquet.h         | 20 +++++--
 python/pyarrow/_dataset.pyx                  | 22 ++++++++
 python/pyarrow/includes/libarrow.pxd         |  3 +
 python/pyarrow/includes/libarrow_dataset.pxd |  4 +-
 python/pyarrow/tests/test_dataset.py         |  6 ++
 6 files changed, 110 insertions(+), 27 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 3eb4c2b..c02f084 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -124,22 +124,32 @@ static Result<SchemaManifest> GetSchemaManifest(
   return manifest;
 }
 
-static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
+static std::shared_ptr<StructScalar> MakeMinMaxScalar(std::shared_ptr<Scalar> min,
+                                                      std::shared_ptr<Scalar> max) {
+  DCHECK(min->type->Equals(max->type));
+  return std::make_shared<StructScalar>(ScalarVector{min, max},
+                                        struct_({
+                                            field("min", min->type),
+                                            field("max", max->type),
+                                        }));
+}
+
+static std::shared_ptr<StructScalar> ColumnChunkStatisticsAsStructScalar(
     const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) {
   // For the remaining of this function, failure to extract/parse statistics
-  // are ignored by returning the `true` scalar. The goal is two fold. First
-  // avoid that an optimization break the computation. Second, allow the
+  // are ignored by returning nullptr. The goal is two fold. First
+  // avoid an optimization which breaks the computation. Second, allow the
   // following columns to maybe succeed in extracting column statistics.
 
   // For now, only leaf (primitive) types are supported.
   if (!schema_field.is_leaf()) {
-    return scalar(true);
+    return nullptr;
   }
 
   auto column_metadata = metadata.ColumnChunk(schema_field.column_index);
   auto statistics = column_metadata->statistics();
   if (statistics == nullptr) {
-    return scalar(true);
+    return nullptr;
   }
 
   const auto& field = schema_field.field;
@@ -147,28 +157,31 @@ static std::shared_ptr<Expression> ColumnChunkStatisticsAsExpression(
 
   // Optimize for corner case where all values are nulls
   if (statistics->num_values() == statistics->null_count()) {
-    return equal(field_expr, scalar(MakeNullScalar(field->type())));
+    auto null = MakeNullScalar(field->type());
+    return MakeMinMaxScalar(null, null);
   }
 
   std::shared_ptr<Scalar> min, max;
   if (!StatisticsAsScalars(*statistics, &min, &max).ok()) {
-    return scalar(true);
+    return nullptr;
   }
 
-  return and_(greater_equal(field_expr, scalar(min)),
-              less_equal(field_expr, scalar(max)));
+  return MakeMinMaxScalar(std::move(min), std::move(max));
 }
 
-static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+static std::shared_ptr<StructScalar> RowGroupStatisticsAsStructScalar(
     const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) {
-  const auto& fields = manifest.schema_fields;
-  ExpressionVector expressions;
-  expressions.reserve(fields.size());
-  for (const auto& field : fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, metadata));
+  FieldVector fields;
+  ScalarVector statistics;
+  for (const auto& schema_field : manifest.schema_fields) {
+    if (auto min_max = ColumnChunkStatisticsAsStructScalar(schema_field, metadata)) {
+      fields.push_back(field(schema_field.field->name(), min_max->type));
+      statistics.push_back(std::move(min_max));
+    }
   }
 
-  return expressions.empty() ? scalar(true) : and_(expressions);
+  return std::make_shared<StructScalar>(std::move(statistics),
+                                        struct_(std::move(fields)));
 }
 
 class ParquetScanTaskIterator {
@@ -349,7 +362,7 @@ static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
     if (!info.HasStatistics() && info.id() < num_row_groups) {
       auto row_group = metadata->RowGroup(info.id());
       info.set_num_rows(row_group->num_rows());
-      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest));
+      info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group, manifest));
     }
   };
   std::for_each(row_groups.begin(), row_groups.end(), augment);
@@ -444,8 +457,39 @@ std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
   return result;
 }
 
+void RowGroupInfo::SetStatisticsExpression() {
+  if (!HasStatistics()) {
+    statistics_expression_ = nullptr;
+    return;
+  }
+
+  if (statistics_->value.empty()) {
+    statistics_expression_ = scalar(true);
+    return;
+  }
+
+  ExpressionVector expressions{statistics_->value.size()};
+
+  for (size_t i = 0; i < expressions.size(); ++i) {
+    const auto& col_stats =
+        internal::checked_cast<const StructScalar&>(*statistics_->value[i]);
+    auto field_expr = field_ref(statistics_->type->field(static_cast<int>(i))->name());
+
+    DCHECK_EQ(col_stats.value.size(), 2);
+    const auto& min = col_stats.value[0];
+    const auto& max = col_stats.value[1];
+
+    DCHECK_EQ(min->is_valid, max->is_valid);
+    expressions[i] = min->is_valid ? and_(greater_equal(field_expr, scalar(min)),
+                                          less_equal(field_expr, scalar(max)))
+                                   : equal(std::move(field_expr), scalar(min));
+  }
+
+  statistics_expression_ = and_(std::move(expressions));
+}
+
 bool RowGroupInfo::Satisfy(const Expression& predicate) const {
-  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_expression_);
 }
 
 ///
@@ -622,7 +666,7 @@ ParquetDatasetFactory::CollectParquetFragments(
       auto row_group = metadata.RowGroup(i);
       ARROW_ASSIGN_OR_RAISE(auto path,
                             FileFromRowGroup(filesystem_.get(), base_path_, *row_group));
-      auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+      auto stats = RowGroupStatisticsAsStructScalar(*row_group, manifest);
       auto num_rows = row_group->num_rows();
 
       // Insert the path, or increase the count of row groups. It will be
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index f5fba7e..182280e 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -134,8 +134,10 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
   explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
 
   /// \brief Construct a RowGroup from an identifier with statistics.
-  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
-      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<StructScalar> statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {
+    SetStatisticsExpression();
+  }
 
   /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
   static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
@@ -150,10 +152,13 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
   int64_t num_rows() const { return num_rows_; }
   void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
 
-  /// \brief Return the RowGroup's statistics
-  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
-  void set_statistics(std::shared_ptr<Expression> statistics) {
+  /// \brief Return the RowGroup's statistics as a StructScalar with a field for
+  /// each column with statistics.
+  /// Each field will also be a StructScalar with "min" and "max" fields.
+  const std::shared_ptr<StructScalar>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<StructScalar> statistics) {
     statistics_ = std::move(statistics);
+    SetStatisticsExpression();
   }
 
   /// \brief Indicate if statistics are set.
@@ -169,9 +174,12 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
   bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
 
  private:
+  void SetStatisticsExpression();
+
   int id_;
   int64_t num_rows_;
-  std::shared_ptr<Expression> statistics_;
+  std::shared_ptr<Expression> statistics_expression_;
+  std::shared_ptr<StructScalar> statistics_;
 };
 
 /// \brief A FileFragment with parquet logic.
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 5ffb11f..8047c5f 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -845,6 +845,28 @@ cdef class RowGroupInfo:
     def num_rows(self):
         return self.info.num_rows()
 
+    @property
+    def statistics(self):
+        if not self.info.HasStatistics():
+            return None
+
+        cdef:
+            CStructScalar* c_statistics
+            CStructScalar* c_minmax
+
+        statistics = dict()
+        c_statistics = self.info.statistics().get()
+        for i in range(c_statistics.value.size()):
+            name = frombytes(c_statistics.type.get().field(i).get().name())
+            c_minmax = <CStructScalar*> c_statistics.value[i].get()
+
+            statistics[name] = {
+                'min': pyarrow_wrap_scalar(c_minmax.value[0]).as_py(),
+                'max': pyarrow_wrap_scalar(c_minmax.value[1]).as_py(),
+            }
+
+        return statistics
+
     def __eq__(self, other):
         if not isinstance(other, RowGroupInfo):
             return False
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index fc6ae62..7f416cb 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -886,6 +886,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
     cdef cppclass CStringScalar" arrow::StringScalar"(CScalar):
         shared_ptr[CBuffer] value
 
+    cdef cppclass CStructScalar" arrow::StructScalar"(CScalar):
+        vector[shared_ptr[CScalar]] value
+
     shared_ptr[CScalar] MakeScalar[Value](Value value)
     shared_ptr[CScalar] MakeStringScalar" arrow::MakeScalar"(c_string value)
 
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index f80fdb9..6823bd9 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -216,11 +216,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
     cdef cppclass CRowGroupInfo "arrow::dataset::RowGroupInfo":
         CRowGroupInfo()
         CRowGroupInfo(int id)
-        CRowGroupInfo(
-            int id, int64_t n_rows, shared_ptr[CExpression] statistics)
         int id() const
         int64_t num_rows() const
         bint Equals(const CRowGroupInfo& other)
+        c_bool HasStatistics() const
+        shared_ptr[CStructScalar] statistics() const
 
     cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
             CFileFragment):
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 08fec4d..2a59f62 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -715,6 +715,12 @@ def test_fragments_parquet_row_groups(tempdir):
     assert len(result) == 2
     assert result.equals(table.slice(0, 2))
 
+    assert row_group_fragments[0].row_groups is not None
+    assert row_group_fragments[0].row_groups[0].statistics == {
+        'f1': {'min': 0, 'max': 1},
+        'f2': {'min': 1, 'max': 1},
+    }
+
     fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0]
     row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1))
     assert len(row_group_fragments) == 1