You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/27 13:48:35 UTC

[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #8507: ARROW-10131: [C++][Dataset][Python] Lazily parse parquet metadata

jorisvandenbossche commented on a change in pull request #8507:
URL: https://github.com/apache/arrow/pull/8507#discussion_r512072709



##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -322,6 +254,13 @@ struct ParquetFactoryOptions {
   // This is useful for partitioning which parses directory when ordering
   // is important, e.g. DirectoryPartitioning.
   std::string partition_base_dir;
+
+  // Assert that all ColumnChunk paths are consitent. The parquet spec allows for

Review comment:
       ```suggestion
     // Assert that all ColumnChunk paths are consistent. The parquet spec allows for
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -162,8 +148,7 @@ static std::shared_ptr<StructScalar> ColumnChunkStatisticsAsStructScalar(
 
   // Optimize for corner case where all values are nulls
   if (statistics->num_values() == statistics->null_count()) {
-    auto null = MakeNullScalar(field->type());
-    return MakeMinMaxScalar(null, null);
+    return equal(std::move(field_expr), scalar(MakeNullScalar(field->type())));

Review comment:
       I am not sure an "equal" expression is correct for nulls, since nulls are never equal
   
   So eg filtering on `filter=ds.field("col") == pa.NULL` during a scan will not work to select the nulls, I suppose.

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -208,43 +146,34 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
 /// number of scanned RowGroups, or to partition the scans across multiple
 /// threads.
 ///
-/// It can also attach optional statistics with each RowGroups, providing
-/// pushdown predicate benefits before invoking any heavy IO. This can induce
+/// Metadata can be explicitly provided, enabling pushdown predicate benefits without
+/// the potentially heavy IO of loading Metadata from the file system. This can induce
 /// significant performance boost when scanning high latency file systems.
 class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
  public:
   Result<FragmentVector> SplitByRowGroup(const std::shared_ptr<Expression>& predicate);
 
-  /// \brief Return the RowGroups selected by this fragment, or nullptr
-  /// if all RowGroups in the parquet file are selected.
-  const std::vector<RowGroupInfo>* row_groups();
-
-  /// \brief Return the number of row groups selected by this fragment.
-  Result<int> GetNumRowGroups();
+  /// \brief Return the RowGroups selected by this fragment.
+  const std::vector<int>& row_groups() const { return row_groups_; }
 
-  /// \brief Indicate if the attached statistics are complete and the physical schema
-  /// is cached.
-  ///
-  /// The statistics are complete if the provided RowGroups (see `row_groups()`)
-  /// is not empty / and all RowGroup return true on `RowGroup::HasStatistics()`.
-  bool HasCompleteMetadata() const { return has_complete_metadata_; }
+  /// \brief Return the FileMetaData associated with this fragment.
+  const std::shared_ptr<parquet::FileMetaData>& metadata() const { return metadata_; }
 
-  /// \brief Ensure attached statistics are complete and the physical schema is cached.
+  /// \brief Ensure this fragment's FileMetaData is in memory.
   Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR);
 
   /// \brief Return a filtered subset of the ParquetFileFragment.
   Result<std::shared_ptr<Fragment>> Subset(const std::shared_ptr<Expression>& predicate);
-  Result<std::shared_ptr<Fragment>> Subset(const std::vector<int> row_group_ids);
+  Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);
 
  private:
   ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                       std::shared_ptr<Expression> partition_expression,
                       std::shared_ptr<Schema> physical_schema,
-                      std::vector<RowGroupInfo> row_groups);
+                      std::vector<int> row_groups, bool select_all_row_groups = false);

Review comment:
       This `select_all_row_groups` is to distinguish between empty row groups selection as actual selection (of no row groups) vs no selection? 
   Can you add a comment about that somewhere? (maybe below where `select_all_row_groups_` is declared)

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -907,66 +908,61 @@ cdef class FileFragment(Fragment):
         return FileFormat.wrap(self.file_fragment.format())
 
 
-cdef class RowGroupInfo(_Weakrefable):
+class RowGroupInfo:
     """A wrapper class for RowGroup information"""
 
-    cdef:
-        CRowGroupInfo info
-
-    def __init__(self, int id):
-        cdef CRowGroupInfo info = CRowGroupInfo(id)
-        self.init(info)
-
-    cdef void init(self, CRowGroupInfo info):
-        self.info = info
-
-    @staticmethod
-    cdef wrap(CRowGroupInfo info):
-        cdef RowGroupInfo self = RowGroupInfo.__new__(RowGroupInfo)
-        self.init(info)
-        return self
-
-    @property
-    def id(self):
-        return self.info.id()
+    def __init__(self, id, metadata=None, schema=None):
+        self.id = id
+        self.metadata = metadata
+        self.schema = schema
 
     @property
     def num_rows(self):
-        return self.info.num_rows()
+        if self.metadata is None:
+            return None

Review comment:
       Can ``self.metadata`` ever be None? 
   It seems this loaded from the file on access if it was not cached (which is fine I think)

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1001,21 +997,26 @@ cdef class ParquetFileFragment(FileFragment):
 
     @property
     def row_groups(self):
-        cdef:
-            const vector[CRowGroupInfo]* c_row_groups
-        c_row_groups = self.parquet_file_fragment.row_groups()
-        if c_row_groups == nullptr:
-            return None
-        return [RowGroupInfo.wrap(c_row_groups.at(i))
-                for i in range(c_row_groups.size())]
+        metadata = self.metadata
+        cdef vector[int] row_groups = self.parquet_file_fragment.row_groups()
+        return [RowGroupInfo(i, metadata.row_group(i), self.physical_schema)
+                for i in row_groups]
+
+    @property
+    def metadata(self):
+        self.ensure_complete_metadata()
+        cdef FileMetaData metadata = FileMetaData.__new__(FileMetaData)

Review comment:
       ```suggestion
           cdef FileMetaData metadata = FileMetaData()
   ```
   
   

##########
File path: cpp/src/arrow/dataset/filter.cc
##########
@@ -1337,7 +1351,11 @@ Result<std::shared_ptr<RecordBatch>> TreeEvaluator::Filter(
   return batch->Slice(0, 0);
 }
 
-std::shared_ptr<Expression> scalar(bool value) { return scalar(MakeScalar(value)); }
+const std::shared_ptr<Expression>& scalar(bool value) {
+  static auto true_ = scalar(MakeScalar(true));
+  static auto false_ = scalar(MakeScalar(false));
+  return value ? true_ : false_;
+}

Review comment:
       Just out of curiosity to understand, what's the reason for the above change?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1935,14 +1934,21 @@ cdef class ParquetFactoryOptions(_Weakrefable):
         have partition information.
     partitioning : Partitioning, PartitioningFactory, optional
         The partitioning scheme applied to fragments, see ``Partitioning``.
+    validate_column_chunk_paths : bool, default False
+        Assert that all ColumnChunk paths are consitent. The parquet spec

Review comment:
       ```suggestion
           Assert that all ColumnChunk paths are consistent. The parquet spec
   ```

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -1102,23 +1092,14 @@ def test_fragments_parquet_subset_ids(tempdir):
     # select with row group ids
     subfrag = fragment.subset(row_group_ids=[0, 3])
     assert subfrag.num_row_groups == 2
-    # the row_groups list is initialized, but don't have statistics
-    assert len(subfrag.row_groups) == 2
-    assert subfrag.row_groups[0].statistics is None
+    assert subfrag.row_groups == [fragment.row_groups[i] for i in [0, 3]]
+
     # check correct scan result of subset
     result = subfrag.to_table()
     assert result.to_pydict() == {"f1": [0, 3], "f2": [1, 1]}
 
-    # if the original fragment has statistics -> preserve them
-    fragment.ensure_complete_metadata()
-    subfrag = fragment.subset(row_group_ids=[0, 3])
-    assert subfrag.num_row_groups == 2

Review comment:
       Can we keep (part of) this test? But for example ensure there is no file IO after the `subset()` call to ensure the metadata are set again on the the subset

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -905,13 +896,12 @@ def test_fragments_parquet_ensure_metadata(tempdir, open_logging_fs):
     assert row_group.num_rows == 2
     assert row_group.statistics is not None
 
-    # pickling preserves row group ids but not statistics
+    # pickling preserves row group ids and statistics

Review comment:
       the test doesn't really test this aspect, though? (the fact that pickling preserved the statistics, as if they were not preserved, we would load them on access) Maybe putting an `with assert_opens([]):` around the asserts after loading the pickle?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -1102,23 +1092,14 @@ def test_fragments_parquet_subset_ids(tempdir):
     # select with row group ids
     subfrag = fragment.subset(row_group_ids=[0, 3])
     assert subfrag.num_row_groups == 2
-    # the row_groups list is initialized, but don't have statistics
-    assert len(subfrag.row_groups) == 2
-    assert subfrag.row_groups[0].statistics is None
+    assert subfrag.row_groups == [fragment.row_groups[i] for i in [0, 3]]
+
     # check correct scan result of subset
     result = subfrag.to_table()
     assert result.to_pydict() == {"f1": [0, 3], "f2": [1, 1]}
 
-    # if the original fragment has statistics -> preserve them
-    fragment.ensure_complete_metadata()
-    subfrag = fragment.subset(row_group_ids=[0, 3])
-    assert subfrag.num_row_groups == 2
-    assert len(subfrag.row_groups) == 2
-    assert subfrag.row_groups[0].statistics is not None
-
     # empty list of ids
     subfrag = fragment.subset(row_group_ids=[])
-    assert subfrag.num_row_groups == 0

Review comment:
       This is something we can keep? (it should still be true, and the `num_row_groups` is independent from `.row_groups` on the python side)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org