You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/07/09 20:58:00 UTC

[GitHub] [arrow] wesm commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

wesm commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452483172



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -335,91 +335,39 @@ static inline bool RowGroupInfosAreComplete(const std::vector<RowGroupInfo>& inf
                      [](const RowGroupInfo& i) { return i.HasStatistics(); });
 }
 
-static inline std::vector<RowGroupInfo> FilterRowGroups(
-    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
-  auto filter = [&predicate](const RowGroupInfo& info) {
-    return !info.Satisfy(predicate);
-  };
-  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
-  row_groups.erase(end, row_groups.end());
-  return row_groups;
-}
-
-static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
-    std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
-  auto metadata = reader->parquet_reader()->metadata();
-  auto manifest = reader->manifest();
-  auto num_row_groups = metadata->num_row_groups();
-
-  if (row_groups.empty()) {
-    row_groups = RowGroupInfo::FromCount(num_row_groups);
-  }
-
-  // Augment a RowGroup with statistics if missing.
-  auto augment = [&](RowGroupInfo& info) {
-    if (!info.HasStatistics() && info.id() < num_row_groups) {
-      auto row_group = metadata->RowGroup(info.id());
-      info.set_num_rows(row_group->num_rows());
-      info.set_total_byte_size(row_group->total_byte_size());
-      info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group, manifest));
-    }
-  };
-  std::for_each(row_groups.begin(), row_groups.end(), augment);
-
-  return row_groups;
-}
-
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions> options,
                                                      std::shared_ptr<ScanContext> context,
                                                      FileFragment* fragment) const {
-  const auto& source = fragment->source();
-  auto row_groups = checked_cast<const ParquetFileFragment*>(fragment)->row_groups();
-
-  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
-  // The following block is required to avoid any IO if all RowGroups are
-  // excluded due to prior statistics knowledge.
-  if (row_groups_are_complete) {
-    // physical_schema should be cached at this point
-    ARROW_ASSIGN_OR_RAISE(auto physical_schema, fragment->ReadPhysicalSchema());
-    RETURN_NOT_OK(options->filter->Validate(*physical_schema));
-
-    // Apply a pre-filtering if the user requested an explicit sub-set of
-    // row-groups. In the case where a RowGroup doesn't have statistics
-    // metdata, it will not be excluded.
-    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  auto& parquet_fragment = checked_cast<ParquetFileFragment&>(*fragment);

Review comment:
       Using a mutable pointer would be more style-conforming

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       does this reference need to be mutable?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Augment a RowGroup with statistics if missing.
+    if (info.HasStatistics()) continue;

Review comment:
       add braces

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       mutable reference

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Augment a RowGroup with statistics if missing.
+    if (info.HasStatistics()) continue;
+
+    auto row_group = metadata->RowGroup(info.id());
+    auto statistics = RowGroupStatisticsAsStructScalar(*row_group, reader->manifest());

Review comment:
       Write out the types here?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();

Review comment:
       write out these types?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org