You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/04/21 13:45:41 UTC

[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7000: ARROW-8065: [C++][Dataset] Refactor ScanOptions and Fragment relation

jorisvandenbossche commented on a change in pull request #7000:
URL: https://github.com/apache/arrow/pull/7000#discussion_r412150862



##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -30,12 +30,22 @@
 namespace arrow {
 namespace dataset {
 
-/// \brief A granular piece of a Dataset, such as an individual file, which can be
-/// read/scanned separately from other fragments.
+/// \brief A granular piece of a Dataset, such as an individual file.
 ///
-/// A Fragment yields a collection of RecordBatch, encapsulated in one or more ScanTasks.
+/// A Fragment can be read/scanned separately from other fragments. It yields a
+/// collection of RecordBatch, encapsulated in one or more ScanTasks.
+///
+/// A notable difference from Dataset is that Fragments have physical schemas
+/// which may differ from Fragments.

Review comment:
       ```suggestion
   /// which may differ from other Fragments.
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -519,30 +500,69 @@ cdef class Fragment:
         """
         return Expression.wrap(self.fragment.partition_expression())
 
-    def to_table(self, use_threads=True, MemoryPool memory_pool=None):
-        """Convert this Fragment into a Table.
+    def _scanner(self, **kwargs):
+        return Scanner.from_fragment(self, **kwargs)
 
-        Use this convenience utility with care. This will serially materialize
-        the Scan result in memory before creating the Table.
+    def scan(self, columns=None, filter=None, use_threads=True,
+             MemoryPool memory_pool=None, **kwargs):
+        """Builds a scan operation against the dataset.
+

Review comment:
       When using Fragment.scan, it uses the Fragment's physical schema for the resulting table? (since the Fragment is not aware of the dataset "read" schema?) 
   If so, we should note that here in the docstring I think

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -433,26 +430,22 @@ Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
   }
   FragmentVector fragments(row_groups.size());
 
-  auto new_options = std::make_shared<ScanOptions>(*fragment.scan_options());
-  if (!extra_filter->Equals(true)) {
-    new_options->filter = and_(std::move(extra_filter), std::move(new_options->filter));
-  }
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          new_options->filter, std::move(row_groups));
+  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), extra_filter,

Review comment:
       We should probably rename "extra_filter" to just "filter" or "predicate" as how it is called in Dataset::GetFragments, since it is no longer "extra" ?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -671,41 +669,29 @@ def test_fragments(tempdir):
     f = fragments[0]
 
     # file's schema does not include partition column
-    phys_schema = f.schema.remove(f.schema.get_field_index('part'))
-    assert f.format.inspect(f.path, f.filesystem) == phys_schema
+    assert f.physical_schema.names == ['f1', 'f2']
+    assert f.format.inspect(f.path, f.filesystem) == f.physical_schema
     assert f.partition_expression.equals(ds.field('part') == 'a')
 
     # scanning fragment includes partition columns
-    result = f.to_table()
-    assert f.schema == result.schema
+    result = f.to_table(schema=dataset.schema)

Review comment:
       Can you also test without passing the dataset's schema, and assert that the column_names are [f1, f2] ?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -671,41 +669,29 @@ def test_fragments(tempdir):
     f = fragments[0]
 
     # file's schema does not include partition column
-    phys_schema = f.schema.remove(f.schema.get_field_index('part'))
-    assert f.format.inspect(f.path, f.filesystem) == phys_schema
+    assert f.physical_schema.names == ['f1', 'f2']
+    assert f.format.inspect(f.path, f.filesystem) == f.physical_schema
     assert f.partition_expression.equals(ds.field('part') == 'a')
 
     # scanning fragment includes partition columns
-    result = f.to_table()
-    assert f.schema == result.schema
+    result = f.to_table(schema=dataset.schema)
     assert result.column_names == ['f1', 'f2', 'part']
-    assert len(result) == 4
     assert result.equals(table.slice(0, 4))
-
-    # scanning fragments follow column projection
-    fragments = list(dataset.get_fragments(columns=['f1', 'part']))

Review comment:
       Keep this but where the columns selection is passe to `to_table` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org