You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/10 12:15:46 UTC

[GitHub] [arrow] jorisvandenbossche opened a new pull request #7921: ARROW-9658: [Python] Initial Python bindings for dataset writing

jorisvandenbossche opened a new pull request #7921:
URL: https://github.com/apache/arrow/pull/7921


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#issuecomment-684789982


   Going to merge this, so I can add Parquet support in a follow-up PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche closed pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche closed pull request #7921:
URL: https://github.com/apache/arrow/pull/7921


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r472262132



##########
File path: python/pyarrow/dataset.py
##########
@@ -682,3 +683,68 @@ def dataset(source, schema=None, format=None, filesystem=None,
             'Expected a path-like, list of path-likes or a list of Datasets '
             'instead of the given type: {}'.format(type(source).__name__)
         )
+
+
+def _ensure_write_partitioning(scheme):
+    if scheme is None:
+        scheme = partitioning(pa.schema([]))
+    if not isinstance(scheme, Partitioning):
+        # TODO support passing field names, and get types from schema
+        raise ValueError("partitioning needs to be actual Partitioning object")
+    return scheme
+
+
+def write_dataset(data, base_dir, format=None, partitioning=None, schema=None,
+                  filesystem=None, use_threads=True):
+    """
+    Write a dataset to a given format and partitioning.
+
+    Parameters
+    ----------
+    data : FileSystemDataset, Table/RecordBatch, or list of Table/RecordBatch
+        The data to write. This can be a FileSystemDataset instance or

Review comment:
       ```suggestion
       data : Dataset, Table/RecordBatch, or list of Table/RecordBatch
           The data to write. This can be a Dataset instance or
   ```
   (A UnionDataset could also be written, for example concatenating a Parquet and a CSV dataset into a single IPC dataset)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r472368169



##########
File path: python/pyarrow/dataset.py
##########
@@ -682,3 +683,70 @@ def dataset(source, schema=None, format=None, filesystem=None,
             'Expected a path-like, list of path-likes or a list of Datasets '
             'instead of the given type: {}'.format(type(source).__name__)
         )
+
+
+def _ensure_write_partitioning(scheme):
+    if scheme is None:
+        scheme = partitioning(pa.schema([]))
+    if not isinstance(scheme, Partitioning):
+        # TODO support passing field names, and get types from schema
+        raise ValueError("partitioning needs to be actual Partitioning object")
+    return scheme
+
+
+def write_dataset(data, base_dir, format=None, partitioning=None, schema=None,
+                  filesystem=None, use_threads=True):
+    """
+    Write a dataset to a given format and partitioning.
+
+    Parameters
+    ----------
+    data : Dataset, Table/RecordBatch, or list of Table/RecordBatch
+        The data to write. This can be a Dataset instance or
+        in-memory Arrow data. A Table or RecordBatch is written as a
+        single fragment (resulting in a single file, or multiple files if
+        split according to the `partitioning`). If you have a Table consisting
+        of multiple record batches, you can pass ``table.to_batches()`` to
+        handle each record batch as a separate fragment.
+    base_dir : str
+        The root directory where to write the dataset.
+    format : FileFormat or str
+        The format in which to write the dataset. Currently supported:
+        "ipc"/"feather". If a Dataset is being written and `format` is not
+        specified, it defaults to the same format as the specified Dataset.
+        When writing a Table or RecordBatch, this keyword is required.
+    partitioning : Partitioning, optional
+        The partitioning scheme specified with the ``partitioning()``
+        function.
+    schema : Schema, optional
+    filesystem : FileSystem, optional
+    use_threads : bool, default True
+        Write files in parallel. If enabled, then maximum parallelism will be
+        used determined by the number of available CPU cores.
+    """
+    if isinstance(data, FileSystemDataset):
+        schema = schema or data.schema
+        format = format or data.format

Review comment:
       ```suggestion
       if isinstance(data, Dataset):
           schema = schema or data.schema
           if isinstance(data, FileSystemDataset):
               format = format or data.format
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r469990610



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2022,3 +2022,66 @@ def _get_partition_keys(Expression partition_expression):
         frombytes(name_val.first): pyarrow_wrap_scalar(name_val.second).as_py()
         for name_val in GetResultValue(CGetPartitionKeys(deref(expr.get())))
     }
+
+
+def _filesystemdataset_write(
+    data, object base_dir, Schema schema,
+    FileFormat format, FileSystem filesystem, Partitioning partitioning
+):
+    """
+    CFileSystemDataset.Write wrapper
+    """
+    cdef:
+        c_string c_base_dir
+        shared_ptr[CSchema] c_schema
+        shared_ptr[CFileFormat] c_format
+        shared_ptr[CFileSystem] c_filesystem
+        shared_ptr[CPartitioning] c_partitioning
+        shared_ptr[CScanContext] c_context
+        # to create iterator of InMemory fragments
+        vector[shared_ptr[CRecordBatch]] c_batches
+        shared_ptr[CFragment] c_fragment
+        vector[shared_ptr[CFragment]] c_fragment_vector
+
+    c_base_dir = tobytes(_stringify_path(base_dir))
+    c_schema = pyarrow_unwrap_schema(schema)
+    c_format = format.unwrap()
+    c_filesystem = filesystem.unwrap()
+    c_partitioning = partitioning.unwrap()
+    # passthrough use_threads?
+    c_context = _build_scan_context()

Review comment:
       Since there is some parallelism, I think it's useful to be able to set `use_threads`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ldacey commented on pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
ldacey commented on pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#issuecomment-693579613


   Do think it is possible to add in support to repartition datasets? I am facing some issues with many small files just due to the frequency that I need to download data, which is compounded by the partitions. 
   
   I asked this on Jira as well but:
   
   1) I download data every 30 minutes from a source using UUID parquet filenames (each file just contains new or updated records since the last retrieval so I could not think of a good callback function name). This is 48 parquet files per day.
   2) The data is then partitioned based on the created_date which creates even more files (some can be quite small)
   3) When I query the dataset, I need to read in a lot of small files.
   
   I would then want to read the data and repartition the files using a callback function so the dozens of files in partition ("date", "==", "2020-09-15") would become 2020-09-15.parquet, consolidated as a single file to keep things tidy. I know I can do this with Spark, but it would be nice to have a native pyarrow method.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r472368545



##########
File path: python/pyarrow/dataset.py
##########
@@ -682,3 +683,70 @@ def dataset(source, schema=None, format=None, filesystem=None,
             'Expected a path-like, list of path-likes or a list of Datasets '
             'instead of the given type: {}'.format(type(source).__name__)
         )
+
+
+def _ensure_write_partitioning(scheme):
+    if scheme is None:
+        scheme = partitioning(pa.schema([]))
+    if not isinstance(scheme, Partitioning):
+        # TODO support passing field names, and get types from schema
+        raise ValueError("partitioning needs to be actual Partitioning object")
+    return scheme
+
+
+def write_dataset(data, base_dir, format=None, partitioning=None, schema=None,
+                  filesystem=None, use_threads=True):
+    """
+    Write a dataset to a given format and partitioning.
+
+    Parameters
+    ----------
+    data : Dataset, Table/RecordBatch, or list of Table/RecordBatch
+        The data to write. This can be a Dataset instance or
+        in-memory Arrow data. A Table or RecordBatch is written as a
+        single fragment (resulting in a single file, or multiple files if
+        split according to the `partitioning`). If you have a Table consisting
+        of multiple record batches, you can pass ``table.to_batches()`` to
+        handle each record batch as a separate fragment.
+    base_dir : str
+        The root directory where to write the dataset.
+    format : FileFormat or str
+        The format in which to write the dataset. Currently supported:
+        "ipc"/"feather". If a Dataset is being written and `format` is not
+        specified, it defaults to the same format as the specified Dataset.
+        When writing a Table or RecordBatch, this keyword is required.

Review comment:
       ```suggestion
           "ipc"/"feather". If a FileSystemDataset is being written and `format` is not
           specified, it defaults to the same format as the specified FileSystemDataset.
           When writing a Table or RecordBatch, this keyword is required.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r469985388



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2022,3 +2022,66 @@ def _get_partition_keys(Expression partition_expression):
         frombytes(name_val.first): pyarrow_wrap_scalar(name_val.second).as_py()
         for name_val in GetResultValue(CGetPartitionKeys(deref(expr.get())))
     }
+
+
+def _filesystemdataset_write(
+    data, object base_dir, Schema schema,
+    FileFormat format, FileSystem filesystem, Partitioning partitioning

Review comment:
       Indeed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7921: ARROW-9658: [Python] Initial Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#issuecomment-671323490


   https://issues.apache.org/jira/browse/ARROW-9658


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r472259523



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2022,3 +2022,66 @@ def _get_partition_keys(Expression partition_expression):
         frombytes(name_val.first): pyarrow_wrap_scalar(name_val.second).as_py()
         for name_val in GetResultValue(CGetPartitionKeys(deref(expr.get())))
     }
+
+
+def _filesystemdataset_write(
+    data, object base_dir, Schema schema,
+    FileFormat format, FileSystem filesystem, Partitioning partitioning
+):
+    """
+    CFileSystemDataset.Write wrapper
+    """
+    cdef:
+        c_string c_base_dir
+        shared_ptr[CSchema] c_schema
+        shared_ptr[CFileFormat] c_format
+        shared_ptr[CFileSystem] c_filesystem
+        shared_ptr[CPartitioning] c_partitioning
+        shared_ptr[CScanContext] c_context
+        # to create iterator of InMemory fragments
+        vector[shared_ptr[CRecordBatch]] c_batches
+        shared_ptr[CFragment] c_fragment
+        vector[shared_ptr[CFragment]] c_fragment_vector
+
+    c_base_dir = tobytes(_stringify_path(base_dir))
+    c_schema = pyarrow_unwrap_schema(schema)
+    c_format = format.unwrap()
+    c_filesystem = filesystem.unwrap()
+    c_partitioning = partitioning.unwrap()
+    # passthrough use_threads?
+    c_context = _build_scan_context()
+
+    if isinstance(data, Dataset):
+        with nogil:
+            check_status(
+                CFileSystemDataset.Write(
+                    c_schema,
+                    c_format,
+                    c_filesystem,
+                    c_base_dir,
+                    c_partitioning,
+                    c_context,
+                    (<Dataset> data).dataset.GetFragments()
+                )
+            )
+    else:
+        # data is list of batches
+        for batch in data:
+            c_batches.push_back((<RecordBatch> batch).sp_batch)
+
+        c_fragment = shared_ptr[CFragment](
+            new CInMemoryFragment(c_batches, _true.unwrap()))
+        c_fragment_vector.push_back(c_fragment)
+
+        with nogil:
+            check_status(
+                CFileSystemDataset.Write(
+                    c_schema,
+                    c_format,
+                    c_filesystem,
+                    c_base_dir,
+                    c_partitioning,
+                    c_context,
+                    MakeVectorIterator(c_fragment_vector)

Review comment:
       Great, this provides a minimal handle on write parallelism; this will be necessary until the c++ implementation can be made more robustly parallel than "one thread per written fragment".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r469950141



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2022,3 +2022,66 @@ def _get_partition_keys(Expression partition_expression):
         frombytes(name_val.first): pyarrow_wrap_scalar(name_val.second).as_py()
         for name_val in GetResultValue(CGetPartitionKeys(deref(expr.get())))
     }
+
+
+def _filesystemdataset_write(
+    data, object base_dir, Schema schema,
+    FileFormat format, FileSystem filesystem, Partitioning partitioning

Review comment:
       Should these be marked `not None`?

##########
File path: python/pyarrow/includes/libarrow_dataset.pxd
##########
@@ -126,6 +126,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
     ctypedef CIterator[shared_ptr[CFragment]] CFragmentIterator \
         "arrow::dataset::FragmentIterator"
 
+    cdef cppclass CInMemoryFragment "arrow::dataset::InMemoryFragment"(
+            CFragment):
+        CInMemoryFragment(vector[shared_ptr[CRecordBatch]] record_batches,
+                          shared_ptr[CExpression] scalar)

Review comment:
       ```suggestion
                             shared_ptr[CExpression] partition_expression)
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2022,3 +2022,66 @@ def _get_partition_keys(Expression partition_expression):
         frombytes(name_val.first): pyarrow_wrap_scalar(name_val.second).as_py()
         for name_val in GetResultValue(CGetPartitionKeys(deref(expr.get())))
     }
+
+
+def _filesystemdataset_write(
+    data, object base_dir, Schema schema,
+    FileFormat format, FileSystem filesystem, Partitioning partitioning
+):
+    """
+    CFileSystemDataset.Write wrapper
+    """
+    cdef:
+        c_string c_base_dir
+        shared_ptr[CSchema] c_schema
+        shared_ptr[CFileFormat] c_format
+        shared_ptr[CFileSystem] c_filesystem
+        shared_ptr[CPartitioning] c_partitioning
+        shared_ptr[CScanContext] c_context
+        # to create iterator of InMemory fragments
+        vector[shared_ptr[CRecordBatch]] c_batches
+        shared_ptr[CFragment] c_fragment
+        vector[shared_ptr[CFragment]] c_fragment_vector
+
+    c_base_dir = tobytes(_stringify_path(base_dir))
+    c_schema = pyarrow_unwrap_schema(schema)
+    c_format = format.unwrap()
+    c_filesystem = filesystem.unwrap()
+    c_partitioning = partitioning.unwrap()
+    # passthrough use_threads?
+    c_context = _build_scan_context()
+
+    if isinstance(data, Dataset):
+        with nogil:
+            check_status(
+                CFileSystemDataset.Write(
+                    c_schema,
+                    c_format,
+                    c_filesystem,
+                    c_base_dir,
+                    c_partitioning,
+                    c_context,
+                    (<Dataset> data).dataset.GetFragments()
+                )
+            )
+    else:
+        # data is list of batches
+        for batch in data:
+            c_batches.push_back((<RecordBatch> batch).sp_batch)
+
+        c_fragment = shared_ptr[CFragment](
+            new CInMemoryFragment(c_batches, _true.unwrap()))
+        c_fragment_vector.push_back(c_fragment)
+
+        with nogil:
+            check_status(
+                CFileSystemDataset.Write(
+                    c_schema,
+                    c_format,
+                    c_filesystem,
+                    c_base_dir,
+                    c_partitioning,
+                    c_context,
+                    MakeVectorIterator(c_fragment_vector)

Review comment:
       Currently we only parallelize over written fragments, so since this only creates a single written fragment use_threads is effectively ignored. I'd recommend creating a fragment wrapping each batch. This will result in one or more files per input record batch however.
    ```suggestion
           # data is list of batches
           for batch in data:
               c_batches.push_back((<RecordBatch> batch).sp_batch)
               c_fragment = shared_ptr[CFragment](
                   new CInMemoryFragment(c_batches, _true.unwrap()))
               c_batches.clear()
               c_fragment_vector.push_back(c_fragment)
   
           with nogil:
               check_status(
                   CFileSystemDataset.Write(
                       c_schema,
                       c_format,
                       c_filesystem,
                       c_base_dir,
                       c_partitioning,
                       c_context,
                       MakeVectorIterator(move(c_fragment_vector))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r469988191



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2022,3 +2022,66 @@ def _get_partition_keys(Expression partition_expression):
         frombytes(name_val.first): pyarrow_wrap_scalar(name_val.second).as_py()
         for name_val in GetResultValue(CGetPartitionKeys(deref(expr.get())))
     }
+
+
+def _filesystemdataset_write(
+    data, object base_dir, Schema schema,
+    FileFormat format, FileSystem filesystem, Partitioning partitioning
+):
+    """
+    CFileSystemDataset.Write wrapper
+    """
+    cdef:
+        c_string c_base_dir
+        shared_ptr[CSchema] c_schema
+        shared_ptr[CFileFormat] c_format
+        shared_ptr[CFileSystem] c_filesystem
+        shared_ptr[CPartitioning] c_partitioning
+        shared_ptr[CScanContext] c_context
+        # to create iterator of InMemory fragments
+        vector[shared_ptr[CRecordBatch]] c_batches
+        shared_ptr[CFragment] c_fragment
+        vector[shared_ptr[CFragment]] c_fragment_vector
+
+    c_base_dir = tobytes(_stringify_path(base_dir))
+    c_schema = pyarrow_unwrap_schema(schema)
+    c_format = format.unwrap()
+    c_filesystem = filesystem.unwrap()
+    c_partitioning = partitioning.unwrap()
+    # passthrough use_threads?
+    c_context = _build_scan_context()
+
+    if isinstance(data, Dataset):
+        with nogil:
+            check_status(
+                CFileSystemDataset.Write(
+                    c_schema,
+                    c_format,
+                    c_filesystem,
+                    c_base_dir,
+                    c_partitioning,
+                    c_context,
+                    (<Dataset> data).dataset.GetFragments()
+                )
+            )
+    else:
+        # data is list of batches
+        for batch in data:
+            c_batches.push_back((<RecordBatch> batch).sp_batch)
+
+        c_fragment = shared_ptr[CFragment](
+            new CInMemoryFragment(c_batches, _true.unwrap()))
+        c_fragment_vector.push_back(c_fragment)
+
+        with nogil:
+            check_status(
+                CFileSystemDataset.Write(
+                    c_schema,
+                    c_format,
+                    c_filesystem,
+                    c_base_dir,
+                    c_partitioning,
+                    c_context,
+                    MakeVectorIterator(c_fragment_vector)

Review comment:
       Yes, this is one of the aspects I wanted to ask: indeed right now by creating a single Fragment, it gets written to a single file. If you have a big table, you might want to make a file per batch. But on the other hand, that might also result in a lot of small files (certainly if you also split on some partition columns)? Eg in Parquet, you typically have files with multiple row groups (which might be somewhat comparable to multiple record batches)
   
   So we should maybe have some configurability?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7921: ARROW-9658: [Python] Python bindings for dataset writing

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7921:
URL: https://github.com/apache/arrow/pull/7921#discussion_r472119868



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2022,3 +2022,66 @@ def _get_partition_keys(Expression partition_expression):
         frombytes(name_val.first): pyarrow_wrap_scalar(name_val.second).as_py()
         for name_val in GetResultValue(CGetPartitionKeys(deref(expr.get())))
     }
+
+
+def _filesystemdataset_write(
+    data, object base_dir, Schema schema,
+    FileFormat format, FileSystem filesystem, Partitioning partitioning
+):
+    """
+    CFileSystemDataset.Write wrapper
+    """
+    cdef:
+        c_string c_base_dir
+        shared_ptr[CSchema] c_schema
+        shared_ptr[CFileFormat] c_format
+        shared_ptr[CFileSystem] c_filesystem
+        shared_ptr[CPartitioning] c_partitioning
+        shared_ptr[CScanContext] c_context
+        # to create iterator of InMemory fragments
+        vector[shared_ptr[CRecordBatch]] c_batches
+        shared_ptr[CFragment] c_fragment
+        vector[shared_ptr[CFragment]] c_fragment_vector
+
+    c_base_dir = tobytes(_stringify_path(base_dir))
+    c_schema = pyarrow_unwrap_schema(schema)
+    c_format = format.unwrap()
+    c_filesystem = filesystem.unwrap()
+    c_partitioning = partitioning.unwrap()
+    # passthrough use_threads?
+    c_context = _build_scan_context()
+
+    if isinstance(data, Dataset):
+        with nogil:
+            check_status(
+                CFileSystemDataset.Write(
+                    c_schema,
+                    c_format,
+                    c_filesystem,
+                    c_base_dir,
+                    c_partitioning,
+                    c_context,
+                    (<Dataset> data).dataset.GetFragments()
+                )
+            )
+    else:
+        # data is list of batches
+        for batch in data:
+            c_batches.push_back((<RecordBatch> batch).sp_batch)
+
+        c_fragment = shared_ptr[CFragment](
+            new CInMemoryFragment(c_batches, _true.unwrap()))
+        c_fragment_vector.push_back(c_fragment)
+
+        with nogil:
+            check_status(
+                CFileSystemDataset.Write(
+                    c_schema,
+                    c_format,
+                    c_filesystem,
+                    c_base_dir,
+                    c_partitioning,
+                    c_context,
+                    MakeVectorIterator(c_fragment_vector)

Review comment:
       @bkietz So I updated this to enable the use case of writing multiple fragments, but by explicit choice by the user -> passing a single Table/RecordBatch = writing a single fragment (even if the Table has multiple batches), passing a list of Table/RecordBatches -> written as multiple fragments. 
   
   So if the user has a Table with multiple batches and wants to write this as multiple files instead of a single file (in case there is no partitioning), they can do `write_dataset(table.to_batches)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org