You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/03 11:23:00 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #13033: ARROW-16413: [Python] Certain dataset APIs hang with a python filesystem

pitrou commented on code in PR #13033:
URL: https://github.com/apache/arrow/pull/13033#discussion_r863678348


##########
python/pyarrow/tests/test_dataset.py:
##########
@@ -3058,25 +3108,52 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _write_metadata_filesystem(
+    schema, path, filesystem, metadata_collector=None, **kwargs
+):
     """
-    Creates a simple (flat files, no nested partitioning) Parquet dataset
+    Version of pq.write_metadata that works with a filesystem
     """
+    with filesystem.open_output_stream(path) as sink:
+        writer = pq.ParquetWriter(sink, schema, **kwargs)
+        writer.close()
 
+    if metadata_collector is not None:
+        # ParquetWriter doesn't expose the metadata until it's written. Write
+        # it and read it again.
+        with filesystem.open_input_file(path) as source:
+            metadata = pq.read_metadata(source)
+        for m in metadata_collector:
+            metadata.append_row_groups(m)
+        with filesystem.open_output_stream(path) as sink:
+            metadata.write_metadata_file(sink)
+
+
+def _create_parquet_dataset_simple(root_path, filesystem=None):
+    """
+    Creates a simple (flat files, no nested partitioning) Parquet dataset
+    """
     metadata_collector = []

Review Comment:
   If `metadata_collector` is an empty list then `metadata.append_row_groups` is never called above? Am I reading this wrong?



##########
python/pyarrow/tests/test_dataset.py:
##########
@@ -2582,6 +2600,38 @@ def test_open_dataset_from_fsspec(tempdir):
     assert dataset.schema.equals(table.schema)
 
 
+@pytest.mark.parquet
+@pytest.mark.s3
+def test_file_format_inspect_fsspec(s3_filesystem):
+    # https://issues.apache.org/jira/browse/ARROW-16413
+    from pyarrow.fs import _ensure_filesystem
+
+    fs, (host, port, access_key, secret_key) = s3_filesystem
+
+    # create bucket + file with pyarrow
+    table = pa.table({'a': [1, 2, 3]})
+    path = "mybucket/data.parquet"
+    with fs.open_output_stream(path) as out:
+        pq.write_table(table, out)
+
+    # read using fsspec filesystem
+    import s3fs

Review Comment:
   Does this test get skipped if `s3fs` is not installed?



##########
python/pyarrow/tests/test_dataset.py:
##########
@@ -3092,6 +3169,30 @@ def test_parquet_dataset_factory(tempdir):
     assert result.num_rows == 40
 
 
+@pytest.mark.parquet
+@pytest.mark.s3
+def test_parquet_dataset_factory_fsspec(s3_filesystem):
+    # https://issues.apache.org/jira/browse/ARROW-16413
+    fs, (host, port, access_key, secret_key) = s3_filesystem
+
+    # create dataset with pyarrow
+    root_path = "mybucket/test_parquet_dataset"
+    metadata_path, table = _create_parquet_dataset_simple(root_path, fs)
+
+    # read using fsspec filesystem
+    import s3fs

Review Comment:
   Same question re: skipping.



##########
python/pyarrow/tests/test_dataset.py:
##########
@@ -3058,25 +3108,52 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _write_metadata_filesystem(
+    schema, path, filesystem, metadata_collector=None, **kwargs
+):
     """
-    Creates a simple (flat files, no nested partitioning) Parquet dataset
+    Version of pq.write_metadata that works with a filesystem
     """
+    with filesystem.open_output_stream(path) as sink:
+        writer = pq.ParquetWriter(sink, schema, **kwargs)
+        writer.close()
 
+    if metadata_collector is not None:
+        # ParquetWriter doesn't expose the metadata until it's written. Write
+        # it and read it again.
+        with filesystem.open_input_file(path) as source:
+            metadata = pq.read_metadata(source)
+        for m in metadata_collector:
+            metadata.append_row_groups(m)
+        with filesystem.open_output_stream(path) as sink:
+            metadata.write_metadata_file(sink)
+
+
+def _create_parquet_dataset_simple(root_path, filesystem=None):
+    """
+    Creates a simple (flat files, no nested partitioning) Parquet dataset
+    """
     metadata_collector = []
 
     for i in range(4):
         table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
         pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
+            table, str(root_path), filesystem=filesystem,
+            metadata_collector=metadata_collector
         )
 
-    metadata_path = str(root_path / '_metadata')
+    metadata_path = str(root_path) + '/_metadata'

Review Comment:
   This should be the same thing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org