You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/30 04:29:58 UTC

[GitHub] [arrow] westonpace opened a new pull request #10628: ARROW-12364: [C++][Dataset][Python] Add a callback to visit file writers just before Finish()

westonpace opened a new pull request #10628:
URL: https://github.com/apache/arrow/pull/10628


   Created writer_post_finish (similar to writer_pre_finish) to visit dataset-created files after Finish.  Added a similar file_visitor concept to pyarrow which maps to writer_post_finish.  Connected the legacy metadata_collector to the file_visitor so that parquet datasets created with use_legacy_dataset=True can support metadata_collector.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665251058



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3095,10 +3104,30 @@ def test_write_dataset_use_threads(tempdir):
         pa.schema([("part", pa.string())]), flavor="hive")
 
     target1 = tempdir / 'partitioned1'
+    paths_written = []
+
+    def file_visitor(written_file):
+        print(f'Visiting {written_file.path}')

Review comment:
       Oops.  Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r666601945



##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,12 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.  This object will contain
+        the path and (if the dataset is a parquet dataset) the parquet

Review comment:
       Ah, I thought WrittenFile was exposed.  I've improved the docstring here.  For my education, what is the concern with users relying on this class?  It seems less brittle than users relying on a snippet of documentation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#issuecomment-873287330


   Thanks for the review @bkietz.  I've addressed the changes you requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665539346



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,57 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+    f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': f2_vals})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )

Review comment:
       Actually, this way it is of course not using the write_to_dataset non-legacy version, so if we are not doing that, we could also keep the original without passing the `use_legacy_dataset` keyword.

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,57 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+    f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': f2_vals})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )

Review comment:
       ```suggestion
       root_path.mkdir()
   
       for i in range(4):
           table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
           path = root_path / f"part-{i}.parquet"
           pq.write_table(table, str(path), metadata_collector=metadata_collector)
           # set the file path relative to the root of the partitioned dataset
           metadata_collector[-1].set_file_path(f"part-{i}.parquet")
   ```
   
   Looking a bit more in detail, the goal of this helper method is actually to create a non-partitioned dataset (so just a directory with flat files), since the partitioned case is tested a bit more below with the helper function `_create_parquet_dataset_partitioned`. So my code suggestion here adapts it to use plain `write_table` (to overcome the issue that `write_to_dataset` generates identical files (and thus overwrites)

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,

Review comment:
       With my suggestion above, those changes can all be reverted, I think (it was changing the intent of the tests)

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,57 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):

Review comment:
       ```suggestion
   def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):
       """
       Creates a simple (flat files, no nested partitioning) Parquet dataset
       """
   ```
   
   (to clarify the intent)

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,57 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+    f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': f2_vals})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )

Review comment:
       But so there is a `_create_parquet_dataset_partitioned` a little bit below, and I think it is that one that should be parametrized with `use_legacy_dataset=True/False`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10628: ARROW-12364: [C++][Dataset][Python] Add a callback to visit file writers just before Finish()

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#issuecomment-871086977


   https://issues.apache.org/jira/browse/ARROW-12364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r666602953



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,57 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+    f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': f2_vals})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )

Review comment:
       I also ended up spinning myself around a few times here.  What I wanted is a test to ensure that we can round trip table -> _metadata -> factory -> dataset -> table using the metadata_collector.  However, changing use_legacy_dataset here fails (since the new dataset doesn't support append, a.k.a ARROW-12358).  An, both use_legacy_dataset versions fail when there is a partition because of ARROW-13269.
   
   So I created a new, simple, test which tests the round trip with a single file and no append and no partitioning.  Tests for the other scenarios can be fleshed out when those issues are addressed.  I then restored all the old tests as they were.  Hopefully this works.

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,57 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):

Review comment:
       Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz closed pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
bkietz closed pull request #10628:
URL: https://github.com/apache/arrow/pull/10628


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665248325



##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.

Review comment:
       I added a bit more details as suggested.  I added the bit about the parquet metadata and the written file path in the WrittenFile.metadata docstring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665250153



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2810,11 +2819,11 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):
 
     # filtering fragments should not open any file
     with assert_opens([]):
-        list(dataset.get_fragments(ds.field("f1") > 15))
+        list(dataset.get_fragments(ds.field("f2") > 15))

Review comment:
       I changed f2 to be int64 values 0, 10, 20, 30.  f1 is no longer usable because it got removed from the schema because it was a partition column.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663270490



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3025,14 +3072,68 @@ def _filesystemdataset_write(
         CFileSystemDatasetWriteOptions c_options
         shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
+        dict visit_args
+        function[cb_writer_finish] c_post_finish_cb
 
     c_options.file_write_options = file_options.unwrap()
     c_options.filesystem = filesystem.unwrap()
     c_options.base_dir = tobytes(_stringify_path(base_dir))
     c_options.partitioning = partitioning.unwrap()
     c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
+    c_post_finish_cb = _filesystemdataset_write_visitor

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r666792863



##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,24 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.  This object will have both
+        a path attribute and a metadata attribute.
+
+        The path attribute will a str containing the absolute path to

Review comment:
       Based on a quick test, it are not always absolute paths, but it depends on what has been passed as base_dir for the written dataset (whether that's an absolute or relative path).
   
   ```suggestion
           The path attribute will be a string containing the path to
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#issuecomment-877445111


   @jorisvandenbossche Thank you for all the review.  I have incorporated your changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665439952



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,

Review comment:
       Nit: for consistency, shouldn't this be called "physical_schema"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663986392



##########
File path: python/pyarrow/parquet.py
##########
@@ -1958,8 +1958,11 @@ def write_to_dataset(table, root_path, partition_cols=None,
             "implementation."
         )
         metadata_collector = kwargs.pop('metadata_collector', None)
+        file_visitor = None
         if metadata_collector is not None:
-            raise ValueError(msg.format("metadata_collector"))
+            def file_visitor(written_file):
+                if written_file.metadata:

Review comment:
       Is this `if` check needed (in theory)? Since this will always be parquet files in this case

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,
         metadata_collector=metadata_collector
     )
-    return metadata_path, table
+    return metadata_path, partitionless_schema
 
 
 @pytest.mark.parquet
 @pytest.mark.pandas  # write_to_dataset currently requires pandas
-def test_parquet_dataset_factory(tempdir):
+@pytest.mark.parametrize('use_legacy_dataset', [False, True])
+def test_parquet_dataset_factory(tempdir, use_legacy_dataset):
     root_path = tempdir / "test_parquet_dataset"
-    metadata_path, table = _create_parquet_dataset_simple(root_path)
+    metadata_path, partitionless_schema = _create_parquet_dataset_simple(
+        root_path, use_legacy_dataset)
     dataset = ds.parquet_dataset(metadata_path)
-    assert dataset.schema.equals(table.schema)
+    assert dataset.schema.equals(partitionless_schema)

Review comment:
       This doesn't seem right to me? I think the dataset's schema should include the partition columns?

##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.

Review comment:
       Should we also mention that the Parquet metadata has been updated with the written file path?

##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.

Review comment:
       Since "WrittenFile" is not a generally know pyarrow class, I would give a bit more details on this (eg the fact that you can get the path and (if parquet) metadata. 
   
   And maybe also give an example use case, something like "For example, this enables to collect the paths or metadata of all written files"

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,

Review comment:
       Is this change needed?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2810,11 +2819,11 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):
 
     # filtering fragments should not open any file
     with assert_opens([]):
-        list(dataset.get_fragments(ds.field("f1") > 15))
+        list(dataset.get_fragments(ds.field("f2") > 15))

Review comment:
       The "f2" column is random normal around 0, so this filter will never yield anything

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3269,8 +3308,19 @@ def test_write_dataset_parquet(tempdir):
 
     # using default "parquet" format string
 
+    files_correct_metadata = 0
+
+    def file_visitor(written_file):
+        nonlocal files_correct_metadata
+        if (written_file.metadata is not None and
+                written_file.metadata.num_columns == 3):
+            files_correct_metadata += 1

Review comment:
       Can you maybe move this into a separate test? (it makes reading the current test for basic functionality a bit complicated IMO)

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2795,7 +2804,7 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):
     # created with ParquetDatasetFactory from a _metadata file
 
     root_path = tempdir / "test_parquet_dataset_lazy_filtering"
-    metadata_path, _ = _create_parquet_dataset_simple(root_path)
+    metadata_path, _ = _create_parquet_dataset_simple(root_path, True)

Review comment:
       You can also add a default for this keyword in the helper function

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3130,19 +3159,29 @@ def test_write_table(tempdir):
 
     # with partitioning
     base_dir = tempdir / 'partitioned'
+    expected_paths = [
+        base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow",
+        base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow"
+    ]
+
+    visited_paths = []
+
+    def file_visitor(written_file):
+        nonlocal visited_paths

Review comment:
       Is this `nonlocal` needed?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3095,10 +3104,30 @@ def test_write_dataset_use_threads(tempdir):
         pa.schema([("part", pa.string())]), flavor="hive")
 
     target1 = tempdir / 'partitioned1'
+    paths_written = []
+
+    def file_visitor(written_file):
+        print(f'Visiting {written_file.path}')

Review comment:
       We should probably remove this print statement at the end before merging? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665440409



##########
File path: python/pyarrow/parquet.py
##########
@@ -1958,8 +1958,11 @@ def write_to_dataset(table, root_path, partition_cols=None,
             "implementation."
         )
         metadata_collector = kwargs.pop('metadata_collector', None)
+        file_visitor = None
         if metadata_collector is not None:
-            raise ValueError(msg.format("metadata_collector"))
+            def file_visitor(written_file):
+                if written_file.metadata:

Review comment:
       It could instead be an `assert`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665252111



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3269,8 +3308,19 @@ def test_write_dataset_parquet(tempdir):
 
     # using default "parquet" format string
 
+    files_correct_metadata = 0
+
+    def file_visitor(written_file):
+        nonlocal files_correct_metadata
+        if (written_file.metadata is not None and
+                written_file.metadata.num_columns == 3):
+            files_correct_metadata += 1

Review comment:
       I moved this check into its own test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665280601



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,
         metadata_collector=metadata_collector
     )
-    return metadata_path, table
+    return metadata_path, partitionless_schema
 
 
 @pytest.mark.parquet
 @pytest.mark.pandas  # write_to_dataset currently requires pandas
-def test_parquet_dataset_factory(tempdir):
+@pytest.mark.parametrize('use_legacy_dataset', [False, True])
+def test_parquet_dataset_factory(tempdir, use_legacy_dataset):
     root_path = tempdir / "test_parquet_dataset"
-    metadata_path, table = _create_parquet_dataset_simple(root_path)
+    metadata_path, partitionless_schema = _create_parquet_dataset_simple(
+        root_path, use_legacy_dataset)
     dataset = ds.parquet_dataset(metadata_path)
-    assert dataset.schema.equals(table.schema)
+    assert dataset.schema.equals(partitionless_schema)

Review comment:
       But so I don't fully understand what this PR changed that causes this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r666792863



##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,24 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.  This object will have both
+        a path attribute and a metadata attribute.
+
+        The path attribute will a str containing the absolute path to

Review comment:
       Based on a quick test, it are not always absolute paths, but it depends on what has been passed as base_dir for the written dataset (whether that's an absolute or relative path).
   
   ```suggestion
           The path attribute will a str containing the path to
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665598566



##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,12 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.  This object will contain
+        the path and (if the dataset is a parquet dataset) the parquet

Review comment:
       The WrittenFile class is (currently) not exposed in the `pyarrow.dataset` namespace (and I think it is good to keep it that way, to not have users rely on the specific class), so I think we still need to be more explicit: eg "contain the path" -> "have a `path` attribute" 
   
   A small example might also help to illustrate, eg this one from the tests:
   
   ```
       visited_paths = []
   
       def file_visitor(written_file):
           visited_paths.append(written_file.path)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r666783710



##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,12 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.  This object will contain
+        the path and (if the dataset is a parquet dataset) the parquet

Review comment:
       > For my education, what is the concern with users relying on this class? It seems less brittle than users relying on a snippet of documentation.
   
   It just "locks us in" on using exactly this class (as users could start relying on the actual specific class (eg `isinstance(obj, WrittenFile`, although that shouldn't be useful in practice), instead of the interface of the class (the fact that it has a path and metadata attributes). Without publicly exposing the class, it gives us the freedom in the future to change this (eg expose the actual FileWriter) without having to worry about possibly breaking code, as long as it still has the path and metadata attributes.
   
   I personally follow Ben's comment about this basically being a namedtuple, but since cython doesn't support namedtuples, a simple class seems a good alternative (a difference with eg ReadOptions, is that a user never creates a WrittenFile themselves). 
   
   Now, I don't care that much about it, and we could also simply expose it publicly :)  (i.e. import it in the pyarrow.dataset namespace and add the class to the API reference docs)
   But I think with your current updated docstring of write_dataset, this is clear enough.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663270720



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3025,14 +3072,68 @@ def _filesystemdataset_write(
         CFileSystemDatasetWriteOptions c_options
         shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
+        dict visit_args
+        function[cb_writer_finish] c_post_finish_cb
 
     c_options.file_write_options = file_options.unwrap()
     c_options.filesystem = filesystem.unwrap()
     c_options.base_dir = tobytes(_stringify_path(base_dir))
     c_options.partitioning = partitioning.unwrap()
     c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
+    c_post_finish_cb = _filesystemdataset_write_visitor
+    if file_visitor is not None:
+        visit_args = {'base_dir': c_options.base_dir,
+                      'file_visitor': file_visitor}
+        c_options.writer_post_finish = BindFunction[cb_writer_finish_internal](
+            &_filesystemdataset_write_visitor, visit_args)
 
     c_scanner = data.unwrap()
     with nogil:
         check_status(CFileSystemDataset.Write(c_options, c_scanner))
+
+
+# basic test to roundtrip through a BoundFunction

Review comment:
       I just got rid of this.  It came along with your experiment and I don't know that we really need it.  Now that BindFunction is actually used we test it indirectly that way.

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):
+    """
+    Metadata information about files written as
+    part of a dataset write operation
+    """
+
+    """The full path to the created file"""
+    cdef public str path
+    """If the file is a parquet file this will contain the parquet metadata"""
+    cdef public object metadata
+
+    def __init__(self, path, metadata):
+        self.path = path
+        self.metadata = metadata
+
+cdef void _filesystemdataset_write_visitor(
+        dict visit_args,
+        CFileWriter* file_writer):
+    cdef:
+        str path
+        str base_dir
+        WrittenFile written_file
+        FileMetaData parquet_metadata
+        CParquetFileWriter* parquet_file_writer
+
+    if file_writer == nullptr:
+        return
+
+    parquet_metadata = None
+    path = frombytes(deref(file_writer).destination().path)
+    base_dir = frombytes(visit_args['base_dir'])

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665248434



##########
File path: python/pyarrow/parquet.py
##########
@@ -1958,8 +1958,11 @@ def write_to_dataset(table, root_path, partition_cols=None,
             "implementation."
         )
         metadata_collector = kwargs.pop('metadata_collector', None)
+        file_visitor = None
         if metadata_collector is not None:
-            raise ValueError(msg.format("metadata_collector"))
+            def file_visitor(written_file):
+                if written_file.metadata:

Review comment:
       I suppose not, I have removed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r661774861



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3025,14 +3072,68 @@ def _filesystemdataset_write(
         CFileSystemDatasetWriteOptions c_options
         shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
+        dict visit_args
+        function[cb_writer_finish] c_post_finish_cb
 
     c_options.file_write_options = file_options.unwrap()
     c_options.filesystem = filesystem.unwrap()
     c_options.base_dir = tobytes(_stringify_path(base_dir))
     c_options.partitioning = partitioning.unwrap()
     c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
+    c_post_finish_cb = _filesystemdataset_write_visitor

Review comment:
       This seems to be unused

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3095,10 +3104,24 @@ def test_write_dataset_use_threads(tempdir):
         pa.schema([("part", pa.string())]), flavor="hive")
 
     target1 = tempdir / 'partitioned1'
+    paths_written = []
+
+    def file_visitor(written_file):
+        paths_written.append(written_file.path)
+
     ds.write_dataset(
         dataset, target1, format="feather", partitioning=partitioning,
-        use_threads=True
+        use_threads=True, file_visitor=file_visitor
     )
+    expected_paths = [
+        target1 / 'part=a' / 'part-0.feather',
+        target1 / 'part=a' / 'part-1.feather',
+        target1 / 'part=b' / 'part-0.feather',
+        target1 / 'part=b' / 'part-1.feather'
+    ]
+    for path in paths_written:
+        assert pathlib.Path(path) in expected_paths

Review comment:
       This technically only requires that paths_written is a subset of expected_paths
   ```suggestion
       assert set(map(pathlib.Path, paths_written)) == {
           target1 / 'part=a' / 'part-0.feather',
           target1 / 'part=a' / 'part-1.feather',
           target1 / 'part=b' / 'part-0.feather',
           target1 / 'part=b' / 'part-1.feather',
       }
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3025,14 +3072,68 @@ def _filesystemdataset_write(
         CFileSystemDatasetWriteOptions c_options
         shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
+        dict visit_args
+        function[cb_writer_finish] c_post_finish_cb
 
     c_options.file_write_options = file_options.unwrap()
     c_options.filesystem = filesystem.unwrap()
     c_options.base_dir = tobytes(_stringify_path(base_dir))
     c_options.partitioning = partitioning.unwrap()
     c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
+    c_post_finish_cb = _filesystemdataset_write_visitor
+    if file_visitor is not None:
+        visit_args = {'base_dir': c_options.base_dir,
+                      'file_visitor': file_visitor}
+        c_options.writer_post_finish = BindFunction[cb_writer_finish_internal](
+            &_filesystemdataset_write_visitor, visit_args)
 
     c_scanner = data.unwrap()
     with nogil:
         check_status(CFileSystemDataset.Write(c_options, c_scanner))
+
+
+# basic test to roundtrip through a BoundFunction

Review comment:
       This should probably be moved to `_common.pyx`, or maybe inlined in `test_cython.py`

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):
+    """
+    Metadata information about files written as
+    part of a dataset write operation
+    """
+
+    """The full path to the created file"""
+    cdef public str path
+    """If the file is a parquet file this will contain the parquet metadata"""
+    cdef public object metadata
+
+    def __init__(self, path, metadata):
+        self.path = path
+        self.metadata = metadata
+
+cdef void _filesystemdataset_write_visitor(
+        dict visit_args,
+        CFileWriter* file_writer):
+    cdef:
+        str path
+        str base_dir
+        WrittenFile written_file
+        FileMetaData parquet_metadata
+        CParquetFileWriter* parquet_file_writer
+
+    if file_writer == nullptr:

Review comment:
       When would this happen? That seems like it should be considered a pure error on the part of FileSystemDataset::Write

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):
+    """
+    Metadata information about files written as
+    part of a dataset write operation
+    """
+
+    """The full path to the created file"""
+    cdef public str path
+    """If the file is a parquet file this will contain the parquet metadata"""
+    cdef public object metadata
+
+    def __init__(self, path, metadata):
+        self.path = path
+        self.metadata = metadata
+
+cdef void _filesystemdataset_write_visitor(
+        dict visit_args,
+        CFileWriter* file_writer):
+    cdef:
+        str path
+        str base_dir
+        WrittenFile written_file
+        FileMetaData parquet_metadata
+        CParquetFileWriter* parquet_file_writer
+
+    if file_writer == nullptr:
+        return
+
+    parquet_metadata = None
+    path = frombytes(deref(file_writer).destination().path)
+    base_dir = frombytes(visit_args['base_dir'])

Review comment:
       please move this into the `if metadata:` block, since it's only used there

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):

Review comment:
       Since this is essentially a named tuple with optional properties based on the format, I think it'd be better to just use a `dict` instead




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663271279



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3095,10 +3104,24 @@ def test_write_dataset_use_threads(tempdir):
         pa.schema([("part", pa.string())]), flavor="hive")
 
     target1 = tempdir / 'partitioned1'
+    paths_written = []
+
+    def file_visitor(written_file):
+        paths_written.append(written_file.path)
+
     ds.write_dataset(
         dataset, target1, format="feather", partitioning=partitioning,
-        use_threads=True
+        use_threads=True, file_visitor=file_visitor
     )
+    expected_paths = [
+        target1 / 'part=a' / 'part-0.feather',
+        target1 / 'part=a' / 'part-1.feather',
+        target1 / 'part=b' / 'part-0.feather',
+        target1 / 'part=b' / 'part-1.feather'
+    ]
+    for path in paths_written:
+        assert pathlib.Path(path) in expected_paths

Review comment:
       That was intentional.  Only two paths are written but I don't know if part-0.feather goes in part=a or part=b (since it is multithreaded).  I refined the check to be a little clearer and added a comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#issuecomment-871678908


   @bkietz @jorisvandenbossche would appreciate any review if you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663271957



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):

Review comment:
       Hmm...I'm not sure I agree.  The same could be said of ReadOptions, ParseOptions, etc.  Since this is part of the public API and the available keys are well defined I think it best to create a class so it can be documented and communicated exactly what should be provided.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r666787458



##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,24 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.  This object will have both
+        a path attribute and a metadata attribute.
+
+        The path attribute will a str containing the absolute path to
+        the created file.
+
+        The metadata attribute will be the parquet metadata of the file.
+        This metadata will have the file path attribute set and can be used
+        to build a _metadata file.  The metadata attribute will be None if
+        the format is not parquet.
+
+        # Example visitor which simple collects the filenames created
+        visited_paths = []
+
+        def file_visitor(written_file):
+            visited_paths.append(written_file.path)

Review comment:
       ```suggestion
           Example visitor which simple collects the filenames created::
   
               visited_paths = []
   
               def file_visitor(written_file):
                   visited_paths.append(written_file.path)
   ```
   
   (then it will render in a code block in the online docs)

##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,24 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.  This object will have both
+        a path attribute and a metadata attribute.
+
+        The path attribute will a str containing the absolute path to

Review comment:
       Based on a quick test, it are not always absolute paths, but it depends on what has been passed as base_dir for the written dataset (whether that's an absolute or relative path).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665252360



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,

Review comment:
       See below comment.  Ideally no, but will be addressed in ARROW-13269




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663986392



##########
File path: python/pyarrow/parquet.py
##########
@@ -1958,8 +1958,11 @@ def write_to_dataset(table, root_path, partition_cols=None,
             "implementation."
         )
         metadata_collector = kwargs.pop('metadata_collector', None)
+        file_visitor = None
         if metadata_collector is not None:
-            raise ValueError(msg.format("metadata_collector"))
+            def file_visitor(written_file):
+                if written_file.metadata:

Review comment:
       Is this `if` check needed (in theory)? Since this will always be parquet files in this case

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,
         metadata_collector=metadata_collector
     )
-    return metadata_path, table
+    return metadata_path, partitionless_schema
 
 
 @pytest.mark.parquet
 @pytest.mark.pandas  # write_to_dataset currently requires pandas
-def test_parquet_dataset_factory(tempdir):
+@pytest.mark.parametrize('use_legacy_dataset', [False, True])
+def test_parquet_dataset_factory(tempdir, use_legacy_dataset):
     root_path = tempdir / "test_parquet_dataset"
-    metadata_path, table = _create_parquet_dataset_simple(root_path)
+    metadata_path, partitionless_schema = _create_parquet_dataset_simple(
+        root_path, use_legacy_dataset)
     dataset = ds.parquet_dataset(metadata_path)
-    assert dataset.schema.equals(table.schema)
+    assert dataset.schema.equals(partitionless_schema)

Review comment:
       This doesn't seem right to me? I think the dataset's schema should include the partition columns?

##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.

Review comment:
       Should we also mention that the Parquet metadata has been updated with the written file path?

##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.

Review comment:
       Since "WrittenFile" is not a generally know pyarrow class, I would give a bit more details on this (eg the fact that you can get the path and (if parquet) metadata. 
   
   And maybe also give an example use case, something like "For example, this enables to collect the paths or metadata of all written files"

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,

Review comment:
       Is this change needed?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2810,11 +2819,11 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):
 
     # filtering fragments should not open any file
     with assert_opens([]):
-        list(dataset.get_fragments(ds.field("f1") > 15))
+        list(dataset.get_fragments(ds.field("f2") > 15))

Review comment:
       The "f2" column is random normal around 0, so this filter will never yield anything

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3269,8 +3308,19 @@ def test_write_dataset_parquet(tempdir):
 
     # using default "parquet" format string
 
+    files_correct_metadata = 0
+
+    def file_visitor(written_file):
+        nonlocal files_correct_metadata
+        if (written_file.metadata is not None and
+                written_file.metadata.num_columns == 3):
+            files_correct_metadata += 1

Review comment:
       Can you maybe move this into a separate test? (it makes reading the current test for basic functionality a bit complicated IMO)

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2795,7 +2804,7 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):
     # created with ParquetDatasetFactory from a _metadata file
 
     root_path = tempdir / "test_parquet_dataset_lazy_filtering"
-    metadata_path, _ = _create_parquet_dataset_simple(root_path)
+    metadata_path, _ = _create_parquet_dataset_simple(root_path, True)

Review comment:
       You can also add a default for this keyword in the helper function

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3130,19 +3159,29 @@ def test_write_table(tempdir):
 
     # with partitioning
     base_dir = tempdir / 'partitioned'
+    expected_paths = [
+        base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow",
+        base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow"
+    ]
+
+    visited_paths = []
+
+    def file_visitor(written_file):
+        nonlocal visited_paths

Review comment:
       Is this `nonlocal` needed?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3095,10 +3104,30 @@ def test_write_dataset_use_threads(tempdir):
         pa.schema([("part", pa.string())]), flavor="hive")
 
     target1 = tempdir / 'partitioned1'
+    paths_written = []
+
+    def file_visitor(written_file):
+        print(f'Visiting {written_file.path}')

Review comment:
       We should probably remove this print statement at the end before merging? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665249658



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,
         metadata_collector=metadata_collector
     )
-    return metadata_path, table
+    return metadata_path, partitionless_schema
 
 
 @pytest.mark.parquet
 @pytest.mark.pandas  # write_to_dataset currently requires pandas
-def test_parquet_dataset_factory(tempdir):
+@pytest.mark.parametrize('use_legacy_dataset', [False, True])
+def test_parquet_dataset_factory(tempdir, use_legacy_dataset):
     root_path = tempdir / "test_parquet_dataset"
-    metadata_path, table = _create_parquet_dataset_simple(root_path)
+    metadata_path, partitionless_schema = _create_parquet_dataset_simple(
+        root_path, use_legacy_dataset)
     dataset = ds.parquet_dataset(metadata_path)
-    assert dataset.schema.equals(table.schema)
+    assert dataset.schema.equals(partitionless_schema)

Review comment:
       I agree with you that it isn't right (and now there is https://stackoverflow.com/questions/68277701/write-pandas-dataframe-parquet-metadata-with-partition-columns#comment120671321_68277701 ).  However, that was the legacy behavior, and I'd rather not tackle it as part of this PR.  I have opened up ARROW-13269 to address it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665252004



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3130,19 +3159,29 @@ def test_write_table(tempdir):
 
     # with partitioning
     base_dir = tempdir / 'partitioned'
+    expected_paths = [
+        base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow",
+        base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow"
+    ]
+
+    visited_paths = []
+
+    def file_visitor(written_file):
+        nonlocal visited_paths

Review comment:
       No.  I've removed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663271097



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -3009,6 +3009,52 @@ def _get_partition_keys(Expression partition_expression):
     return out
 
 
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):
+    """
+    Metadata information about files written as
+    part of a dataset write operation
+    """
+
+    """The full path to the created file"""
+    cdef public str path
+    """If the file is a parquet file this will contain the parquet metadata"""
+    cdef public object metadata
+
+    def __init__(self, path, metadata):
+        self.path = path
+        self.metadata = metadata
+
+cdef void _filesystemdataset_write_visitor(
+        dict visit_args,
+        CFileWriter* file_writer):
+    cdef:
+        str path
+        str base_dir
+        WrittenFile written_file
+        FileMetaData parquet_metadata
+        CParquetFileWriter* parquet_file_writer
+
+    if file_writer == nullptr:

Review comment:
       Yep, I was thinking of writing a dataset to something other than a filesystem but there is no reason to think it would use a the same method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#issuecomment-871209883


   This PR now includes #10619 .  Some notes for review:
   
    * Ben and I took parallel approaches at this.  Ben's approach was to mirror the C++ API and create a FileWriter to wrap CFileWriter.  My approach was to create a WrittenFile class which is just the path & metadata (if present) and expose that as `file_visitor`.  I'm happy to switch if we feel the other is better.  My rationale was "FileWriter is an internal class, best to hide the concept and only expose what is needed."
    * The existing metadata_collector is a bit clunky when working with partitioned datasets.  The _metadata file does not contain the partition columns.  This does appear to be the intent (with common_metadata, if it exists, containing the full schema) but without a working spark/hadoop setup I can't be completely certain.
    * The existing tests for metadata_collector were calling write_dataset on the same directory multiple times and expecting multiple files to be created (since the legacy writer uses a guid for naming).  This seems somewhat related to ARROW-12358.  I just updated the test to call write_dataset once with a partitioned column.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace edited a comment on pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace edited a comment on pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#issuecomment-871209883


   This PR now includes #10619 .  Some notes for review:
   
    * Ben and I took parallel approaches at this.  Ben's approach was to mirror the C++ API and create a FileWriter to wrap CFileWriter.  My approach was to create a WrittenFile class which is just the path & metadata (if present) and expose that as `file_visitor`.  I'm happy to switch if we feel the other is better.  My rationale was "FileWriter is an internal class, best to hide the concept and only expose what is needed."
    * ARROW-10440 added a visitor to be called right before finish was called on a file.  For metadata_collector to work I needed to create a visitor that is called right after finish is called on the file so I added the second visitor as part of this PR.
    * The existing metadata_collector is a bit clunky when working with partitioned datasets.  The _metadata file does not contain the partition columns.  This does appear to be the intent (with common_metadata, if it exists, containing the full schema) but without a working spark/hadoop setup I can't be completely certain.
    * The existing tests for metadata_collector were calling write_dataset on the same directory multiple times and expecting multiple files to be created (since the legacy writer uses a guid for naming).  This seems somewhat related to ARROW-12358.  I just updated the test to call write_dataset once with a partitioned column.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10628: ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r665249741



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2795,7 +2804,7 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):
     # created with ParquetDatasetFactory from a _metadata file
 
     root_path = tempdir / "test_parquet_dataset_lazy_filtering"
-    metadata_path, _ = _create_parquet_dataset_simple(root_path)
+    metadata_path, _ = _create_parquet_dataset_simple(root_path, True)

Review comment:
       I added the default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org