You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/07/09 17:57:59 UTC

[GitHub] [arrow] bkietz opened a new pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

bkietz opened a new pull request #7692:
URL: https://github.com/apache/arrow/pull/7692


   Populate ParquetFileFragment statistics whenever a reader is opened anyway. Also provides an explicit method for forcing load of statistics. (I exposed this as a public method, but maybe we'd prefer to hide it inside the `statistics` property the way we do physical schema?)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452486948



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       This one doesn't

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       yes; in this loop row group infos which have only partial statistics are hydrated from the reader




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452408096



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -909,13 +909,24 @@ cdef class ParquetFileFragment(FileFragment):
 
     def __reduce__(self):
         buffer = self.buffer
+        if self.row_groups is not None:

Review comment:
       Yeah, I actually realized we were not yet pickling the row group id's when discussing this in the meeting we had, and was planning to open a JIRA / do a quick PR, but you already fixed it ;)
   
   (it didn't fail because we simply didn't include any row group information in the serialization)
   
   Only preserving the rowgroup id's (as you do here) should be sufficient for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452490750



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       If you prefer I can use an index and assign to a vector element that way




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452394725



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -909,13 +909,24 @@ cdef class ParquetFileFragment(FileFragment):
 
     def __reduce__(self):
         buffer = self.buffer
+        if self.row_groups is not None:

Review comment:
       When I started working on the python binding, the compiler complained that RowGroupInfo is not pickleable. I'm not sure why this didn't fail before; we've never had RowGroupInfo.__reduce__ and ParquetFileFormat.make_fragment only accepts a list of integers for the row_groups argument




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452472840



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -909,13 +909,24 @@ cdef class ParquetFileFragment(FileFragment):
 
     def __reduce__(self):
         buffer = self.buffer
+        if self.row_groups is not None:

Review comment:
       Note that (due to stats containing scalars) pickling of RowGroupInfo depends on pickling of Scalars. Added ARROW-9394




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452477463



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -909,13 +909,24 @@ cdef class ParquetFileFragment(FileFragment):
 
     def __reduce__(self):
         buffer = self.buffer
+        if self.row_groups is not None:

Review comment:
       Indeed. Based on my discussions with @rjzamora earlier today, the row group id's are the most critical to serialize (which this PR thus already fixes), and should cover the short-term needs for dask's use cases.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452486948



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       yes; in this loop row group infos which have only partial statistics are hydrated from the reader




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche edited a comment on pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche edited a comment on pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#issuecomment-656338363


   > I mean it could be called inside the statistics property accessor so that the returned statistics are never None
   
   Ah, I misunderstood. That might also be nice (avoiding the extra method). Now, the problem with that approach, AFAIU, is that right now `fragment.row_groups` is `None` if no statistics are loaded yet. 
   So it would need to happen also when accessing the `fragment.row_groups` property. But since that sometimes can actually already return a list of RowGroupInfo objects with only the id's, that might get a bit complicated (as then several properties would need first ensure metadata are loaded, both the `row_groups` if not yet existing, and then `RowGroupInfo.statistics/num_rows` for RowGroupInfo's that only had the `id` set).
   
   So although the automatic "ensure_metadata" on access of the properties sounds nice, in the end an explicit method to call yourself might be cleaner, I think (also for downstream users like dask)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] wesm closed pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
wesm closed pull request #7692:
URL: https://github.com/apache/arrow/pull/7692


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452487124



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       This mutable reference is necessary; in this loop row group infos which have only partial statistics are hydrated from the reader




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#issuecomment-656338363


   > I mean it could be called inside the statistics property accessor so that the returned statistics are never None
   
   Ah, I misunderstood. That might also be nice (avoiding the extra method). Now, the problem with that approach, AFAIU, is that right now `fragment.row_groups` is `None` if no statistics are loaded yet. 
   So it would need to happen also when accessing the `fragment.row_groups` property. But since that sometimes can actually already return a list of RowGroupInfo objects with only the id's, that might get a bit complicated (as then several properties would need first ensure metadata are loaded).
   
   So although the automatic "ensure_metadata" on access of the properties sounds nice, in the end an explicit method to call yourself might be cleaner, I think (also for downstream users like dask)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452488092



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Augment a RowGroup with statistics if missing.
+    if (info.HasStatistics()) continue;

Review comment:
       The style guide doesn't require them and I don't think this is less readable https://google.github.io/styleguide/cppguide.html#Conditionals




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452408096



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -909,13 +909,24 @@ cdef class ParquetFileFragment(FileFragment):
 
     def __reduce__(self):
         buffer = self.buffer
+        if self.row_groups is not None:

Review comment:
       Yeah, I actually realized we were not yet pickling the row group id's when discussing this in the meeting we had, and was planning to open a JIRA / do a quick PR, but you already fixed it ;)
   
   (it didn't fail ~~because we simply didn't include any row group information in the serialization~~ because we only tested cases where row_groups was None)
   
   Only preserving the rowgroup id's (as you do here) should be sufficient for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#issuecomment-657258482


   @wesm PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#issuecomment-656271917


   https://issues.apache.org/jira/browse/ARROW-9321


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] wesm commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
wesm commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452483172



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -335,91 +335,39 @@ static inline bool RowGroupInfosAreComplete(const std::vector<RowGroupInfo>& inf
                      [](const RowGroupInfo& i) { return i.HasStatistics(); });
 }
 
-static inline std::vector<RowGroupInfo> FilterRowGroups(
-    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
-  auto filter = [&predicate](const RowGroupInfo& info) {
-    return !info.Satisfy(predicate);
-  };
-  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
-  row_groups.erase(end, row_groups.end());
-  return row_groups;
-}
-
-static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
-    std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
-  auto metadata = reader->parquet_reader()->metadata();
-  auto manifest = reader->manifest();
-  auto num_row_groups = metadata->num_row_groups();
-
-  if (row_groups.empty()) {
-    row_groups = RowGroupInfo::FromCount(num_row_groups);
-  }
-
-  // Augment a RowGroup with statistics if missing.
-  auto augment = [&](RowGroupInfo& info) {
-    if (!info.HasStatistics() && info.id() < num_row_groups) {
-      auto row_group = metadata->RowGroup(info.id());
-      info.set_num_rows(row_group->num_rows());
-      info.set_total_byte_size(row_group->total_byte_size());
-      info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group, manifest));
-    }
-  };
-  std::for_each(row_groups.begin(), row_groups.end(), augment);
-
-  return row_groups;
-}
-
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions> options,
                                                      std::shared_ptr<ScanContext> context,
                                                      FileFragment* fragment) const {
-  const auto& source = fragment->source();
-  auto row_groups = checked_cast<const ParquetFileFragment*>(fragment)->row_groups();
-
-  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
-  // The following block is required to avoid any IO if all RowGroups are
-  // excluded due to prior statistics knowledge.
-  if (row_groups_are_complete) {
-    // physical_schema should be cached at this point
-    ARROW_ASSIGN_OR_RAISE(auto physical_schema, fragment->ReadPhysicalSchema());
-    RETURN_NOT_OK(options->filter->Validate(*physical_schema));
-
-    // Apply a pre-filtering if the user requested an explicit sub-set of
-    // row-groups. In the case where a RowGroup doesn't have statistics
-    // metdata, it will not be excluded.
-    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  auto& parquet_fragment = checked_cast<ParquetFileFragment&>(*fragment);

Review comment:
       Using a mutable pointer would be more style-conforming

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       does this reference need to be mutable?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Augment a RowGroup with statistics if missing.
+    if (info.HasStatistics()) continue;

Review comment:
       add braces

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       mutable reference

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Augment a RowGroup with statistics if missing.
+    if (info.HasStatistics()) continue;
+
+    auto row_group = metadata->RowGroup(info.id());
+    auto statistics = RowGroupStatisticsAsStructScalar(*row_group, reader->manifest());

Review comment:
       Write out the types here?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();

Review comment:
       write out these types?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] wesm commented on pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
wesm commented on pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#issuecomment-657282470


   The VS 2019 failure looks spurious, rebasing to see if that fixes it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#issuecomment-656523653


   I added some python tests for the new `ensure_complete_metadata` and pickling.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452486734



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -335,91 +335,39 @@ static inline bool RowGroupInfosAreComplete(const std::vector<RowGroupInfo>& inf
                      [](const RowGroupInfo& i) { return i.HasStatistics(); });
 }
 
-static inline std::vector<RowGroupInfo> FilterRowGroups(
-    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
-  auto filter = [&predicate](const RowGroupInfo& info) {
-    return !info.Satisfy(predicate);
-  };
-  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
-  row_groups.erase(end, row_groups.end());
-  return row_groups;
-}
-
-static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
-    std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
-  auto metadata = reader->parquet_reader()->metadata();
-  auto manifest = reader->manifest();
-  auto num_row_groups = metadata->num_row_groups();
-
-  if (row_groups.empty()) {
-    row_groups = RowGroupInfo::FromCount(num_row_groups);
-  }
-
-  // Augment a RowGroup with statistics if missing.
-  auto augment = [&](RowGroupInfo& info) {
-    if (!info.HasStatistics() && info.id() < num_row_groups) {
-      auto row_group = metadata->RowGroup(info.id());
-      info.set_num_rows(row_group->num_rows());
-      info.set_total_byte_size(row_group->total_byte_size());
-      info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group, manifest));
-    }
-  };
-  std::for_each(row_groups.begin(), row_groups.end(), augment);
-
-  return row_groups;
-}
-
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions> options,
                                                      std::shared_ptr<ScanContext> context,
                                                      FileFragment* fragment) const {
-  const auto& source = fragment->source();
-  auto row_groups = checked_cast<const ParquetFileFragment*>(fragment)->row_groups();
-
-  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
-  // The following block is required to avoid any IO if all RowGroups are
-  // excluded due to prior statistics knowledge.
-  if (row_groups_are_complete) {
-    // physical_schema should be cached at this point
-    ARROW_ASSIGN_OR_RAISE(auto physical_schema, fragment->ReadPhysicalSchema());
-    RETURN_NOT_OK(options->filter->Validate(*physical_schema));
-
-    // Apply a pre-filtering if the user requested an explicit sub-set of
-    // row-groups. In the case where a RowGroup doesn't have statistics
-    // metdata, it will not be excluded.
-    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  auto& parquet_fragment = checked_cast<ParquetFileFragment&>(*fragment);

Review comment:
       okay




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452491386



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();

Review comment:
       If you insist, otherwise I'd prefer to leave it as `auto`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#issuecomment-656331228


   @jorisvandenbossche 
   > I exposed this as a public method, but maybe we'd prefer to hide it inside the statistics property the way we do physical schema?
   
   I mean it could be called inside the `statistics` property accessor so that the returned statistics are never `None`, similar to the way `physical_schema` will read from disk if the schema is not cached


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] wesm commented on a change in pull request #7692: ARROW-9321: [C++][Dataset] Populate statistics opportunistically

Posted by GitBox <gi...@apache.org>.
wesm commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452488952



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but ",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Augment a RowGroup with statistics if missing.
+    if (info.HasStatistics()) continue;

Review comment:
       OK, that sounds fine then. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org