You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/30 20:32:04 UTC

[GitHub] [arrow] bkietz commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

bkietz commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r661774861



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3025,14 +3072,68 @@ def _filesystemdataset_write(
         CFileSystemDatasetWriteOptions c_options
         shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
+        dict visit_args
+        function[cb_writer_finish] c_post_finish_cb
 
     c_options.file_write_options = file_options.unwrap()
     c_options.filesystem = filesystem.unwrap()
     c_options.base_dir = tobytes(_stringify_path(base_dir))
     c_options.partitioning = partitioning.unwrap()
     c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
+    c_post_finish_cb = _filesystemdataset_write_visitor

Review comment:
       This seems to be unused

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3095,10 +3104,24 @@ def test_write_dataset_use_threads(tempdir):
         pa.schema([("part", pa.string())]), flavor="hive")
 
     target1 = tempdir / 'partitioned1'
+    paths_written = []
+
+    def file_visitor(written_file):
+        paths_written.append(written_file.path)
+
     ds.write_dataset(
         dataset, target1, format="feather", partitioning=partitioning,
-        use_threads=True
+        use_threads=True, file_visitor=file_visitor
     )
+    expected_paths = [
+        target1 / 'part=a' / 'part-0.feather',
+        target1 / 'part=a' / 'part-1.feather',
+        target1 / 'part=b' / 'part-0.feather',
+        target1 / 'part=b' / 'part-1.feather'
+    ]
+    for path in paths_written:
+        assert pathlib.Path(path) in expected_paths

Review comment:
       This technically only requires that paths_written is a subset of expected_paths
   ```suggestion
       assert set(map(pathlib.Path, paths_written)) == {
           target1 / 'part=a' / 'part-0.feather',
           target1 / 'part=a' / 'part-1.feather',
           target1 / 'part=b' / 'part-0.feather',
           target1 / 'part=b' / 'part-1.feather',
       }
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3025,14 +3072,68 @@ def _filesystemdataset_write(
         CFileSystemDatasetWriteOptions c_options
         shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
+        dict visit_args
+        function[cb_writer_finish] c_post_finish_cb
 
     c_options.file_write_options = file_options.unwrap()
     c_options.filesystem = filesystem.unwrap()
     c_options.base_dir = tobytes(_stringify_path(base_dir))
     c_options.partitioning = partitioning.unwrap()
     c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
+    c_post_finish_cb = _filesystemdataset_write_visitor
+    if file_visitor is not None:
+        visit_args = {'base_dir': c_options.base_dir,
+                      'file_visitor': file_visitor}
+        c_options.writer_post_finish = BindFunction[cb_writer_finish_internal](
+            &_filesystemdataset_write_visitor, visit_args)
 
     c_scanner = data.unwrap()
     with nogil:
         check_status(CFileSystemDataset.Write(c_options, c_scanner))
+
+
+# basic test to roundtrip through a BoundFunction

Review comment:
       This should probably be moved to `_common.pyx`, or maybe inlined in `test_cython.py`

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):
+    """
+    Metadata information about files written as
+    part of a dataset write operation
+    """
+
+    """The full path to the created file"""
+    cdef public str path
+    """If the file is a parquet file this will contain the parquet metadata"""
+    cdef public object metadata
+
+    def __init__(self, path, metadata):
+        self.path = path
+        self.metadata = metadata
+
+cdef void _filesystemdataset_write_visitor(
+        dict visit_args,
+        CFileWriter* file_writer):
+    cdef:
+        str path
+        str base_dir
+        WrittenFile written_file
+        FileMetaData parquet_metadata
+        CParquetFileWriter* parquet_file_writer
+
+    if file_writer == nullptr:

Review comment:
       When would this happen? That seems like it should be considered a pure error on the part of FileSystemDataset::Write

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):
+    """
+    Metadata information about files written as
+    part of a dataset write operation
+    """
+
+    """The full path to the created file"""
+    cdef public str path
+    """If the file is a parquet file this will contain the parquet metadata"""
+    cdef public object metadata
+
+    def __init__(self, path, metadata):
+        self.path = path
+        self.metadata = metadata
+
+cdef void _filesystemdataset_write_visitor(
+        dict visit_args,
+        CFileWriter* file_writer):
+    cdef:
+        str path
+        str base_dir
+        WrittenFile written_file
+        FileMetaData parquet_metadata
+        CParquetFileWriter* parquet_file_writer
+
+    if file_writer == nullptr:
+        return
+
+    parquet_metadata = None
+    path = frombytes(deref(file_writer).destination().path)
+    base_dir = frombytes(visit_args['base_dir'])

Review comment:
       please move this into the `if metadata:` block, since it's only used there

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):

Review comment:
       Since this is essentially a named tuple with optional properties based on the format, I think it'd be better to just use a `dict` instead




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org