You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/15 11:45:10 UTC

[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10693: ARROW-13224: [Python][Doc] Documentation missing for pyarrow.dataset.write_dataset

jorisvandenbossche commented on a change in pull request #10693:
URL: https://github.com/apache/arrow/pull/10693#discussion_r670370362



##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files

Review comment:
       "Scheduling" is a bit strange term to use in this context? 

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  This is

Review comment:
       ```suggestion
   The previous examples have demonstrated how to read the data into a table using :func:`~Dataset.to_table`.  This is
   ```
   
   ?

##########
File path: python/pyarrow/_hdfs.pyx
##########
@@ -93,9 +93,10 @@ cdef class HadoopFileSystem(FileSystem):
         Instantiate HadoopFileSystem object from an URI string.
 
         The following two calls are equivalent
-        * HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test'
-                                    '&replication=1')
-        * HadoopFileSystem('localhost', port=8020, user='test', replication=1)
+
+        * ``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test&replication=1')`` # noqa: E501
+
+        * ``HadoopFileSystem('localhost', port=8020, user='test', replication=1)`` # noqa: E501

Review comment:
       ```suggestion
           * ``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test\
   &replication=1')``
           * ``HadoopFileSystem('localhost', port=8020, user='test', \
   replication=1)``
   ```
   
   It's a bit ugly in the source code, but this AFAIK the best way to deal with this issue. In a console checking the docstring this will look like a single line.

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  This is
+useful if the dataset is small or there is only a small amount of data that needs to
+be read.  The dataset API contains additional methods to read and process large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  This
+method returns an iterator of record batches.  For example, we can use this method to
+calculate the average of a column without loading the entire column into memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+        col2_sum += pc.sum(batch.column('col2')).as_py()
+        count += batch.num_rows
+    mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and pyarrow
+uses an object called a :class:`Scanner` to do this.  A Scanner is created for you
+automatically by the to_table and to_batches method of the dataset.  Any arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`.  This controls the maximum size of the
+batches returned by the scanner.  Batches can still be smaller than the `batch_size`
+if the dataset consists of small files or those files themselves consist of small
+row groups.  For example, a parquet file with 10,000 rows per row group will yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller value.
+
+The default batch size is one million rows and this is typically a good default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset.  This can be useful when
+you want to partition your data or you need to write a large amount of data.  A
+basic dataset write is similar to writing a table except that you specify a directory
+instead of a filename.
+
+.. ipython:: python
+
+    base = pathlib.Path(tempfile.gettempdir())
+    dataset_root = base / "sample_dataset"
+    dataset_root.mkdir(exist_ok=True)
+
+    table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})
+    ds.write_dataset(table, dataset_root, format="parquet")
+
+The above example will create a single file named part-0.parquet in our sample_dataset
+directory.
+
+.. warning::
+
+    If you run the example again it will replace the existing part-0.parquet file.
+    Appending files to an existing dataset is not currently supported by this API and
+    the output directory should be empty for predictable results.
+
+Writing partitioned data
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+A partitioning object can be used to specify how your output data should be partitioned.
+This uses the same kind of partitioning objects we used for reading datasets.  To write
+our above data out to a partitioned directory we only need to specify how we want the
+dataset to be partitioned.  For example:
+
+.. ipython:: python
+
+    part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor="hive"
+    )
+    ds.write_dataset(table, dataset_root, format="parquet", partitioning=part)
+
+This will create two files.  Half our data will be in the dataset_root/c=1 directory and
+the other half will be in the dataset_root/c=2 directory.
+
+Writing large amounts of data
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The above examples wrote data from a table.  If you are writing a large amount of data
+you may not be able to load everything into a single in-memory table.  Fortunately, the
+write_dataset method also accepts an iterable of record batches.  This makes it really
+simple, for example, to repartition a large dataset without loading the entire dataset
+into memory:
+
+.. ipython:: python
+
+    old_part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor="hive"
+    )
+    new_part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor=None
+    )
+    input_dataset = ds.dataset(dataset_root, partitioning=old_part)
+    new_root = base / "repartitioned_dataset"
+    # A scanner can act as an iterator of record batches but you could also receive
+    # data from the network (e.g. via flight), from your own scanning, or from any
+    # other method that yields record batches.  In addition, you can pass a dataset
+    # into write_dataset directly but this method is useful if you want to customize
+    # the scanner (e.g. to filter the input dataset or set a maximum batch size)
+    scanner = input_dataset.scanner()
+
+    ds.write_dataset(scanner, new_root, format="parquet", partitioning=new_part)
+
+After the above example runs our data will be in dataset_root/1 and dataset_root/2

Review comment:
       Since this example is not changing the partitioning structure (except for the flavor), it's not directly what this "repartitioning" will do apart from changing `/c=1/` into `/1/`. Will it also repartition individual files? (eg write more smaller files in case your input dataset has large files)

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  This is
+useful if the dataset is small or there is only a small amount of data that needs to
+be read.  The dataset API contains additional methods to read and process large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  This
+method returns an iterator of record batches.  For example, we can use this method to
+calculate the average of a column without loading the entire column into memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+        col2_sum += pc.sum(batch.column('col2')).as_py()
+        count += batch.num_rows
+    mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and pyarrow
+uses an object called a :class:`Scanner` to do this.  A Scanner is created for you
+automatically by the to_table and to_batches method of the dataset.  Any arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`.  This controls the maximum size of the

Review comment:
       ```suggestion
   One of those parameters is the ``batch_size``.  This controls the maximum size of the
   ```
   
   (rst uses double backticks for code, in contrast to markdown ..)

##########
File path: python/pyarrow/_hdfs.pyx
##########
@@ -93,9 +93,10 @@ cdef class HadoopFileSystem(FileSystem):
         Instantiate HadoopFileSystem object from an URI string.
 
         The following two calls are equivalent
-        * HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test'
-                                    '&replication=1')
-        * HadoopFileSystem('localhost', port=8020, user='test', replication=1)
+
+        * ``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test&replication=1')`` # noqa: E501
+
+        * ``HadoopFileSystem('localhost', port=8020, user='test', replication=1)`` # noqa: E501

Review comment:
       I added a suggestion below to fix this. I think that's preferrable since the ``# noqa: E501`` actually would show up as normal text which would be strange in the online rendered docstring

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  This is
+useful if the dataset is small or there is only a small amount of data that needs to
+be read.  The dataset API contains additional methods to read and process large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  This
+method returns an iterator of record batches.  For example, we can use this method to
+calculate the average of a column without loading the entire column into memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):

Review comment:
       Maybe add a `columns=["col2"]` as well? (it shows passing a column selection as well + it is actually what you should do here if you are only summing that column)

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  This is
+useful if the dataset is small or there is only a small amount of data that needs to
+be read.  The dataset API contains additional methods to read and process large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  This
+method returns an iterator of record batches.  For example, we can use this method to
+calculate the average of a column without loading the entire column into memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+        col2_sum += pc.sum(batch.column('col2')).as_py()
+        count += batch.num_rows
+    mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and pyarrow
+uses an object called a :class:`Scanner` to do this.  A Scanner is created for you
+automatically by the to_table and to_batches method of the dataset.  Any arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`.  This controls the maximum size of the
+batches returned by the scanner.  Batches can still be smaller than the `batch_size`
+if the dataset consists of small files or those files themselves consist of small
+row groups.  For example, a parquet file with 10,000 rows per row group will yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller value.
+
+The default batch size is one million rows and this is typically a good default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset.  This can be useful when
+you want to partition your data or you need to write a large amount of data.  A
+basic dataset write is similar to writing a table except that you specify a directory
+instead of a filename.
+
+.. ipython:: python
+
+    base = pathlib.Path(tempfile.gettempdir())
+    dataset_root = base / "sample_dataset"
+    dataset_root.mkdir(exist_ok=True)
+
+    table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})
+    ds.write_dataset(table, dataset_root, format="parquet")
+
+The above example will create a single file named part-0.parquet in our sample_dataset
+directory.
+
+.. warning::
+
+    If you run the example again it will replace the existing part-0.parquet file.
+    Appending files to an existing dataset is not currently supported by this API and
+    the output directory should be empty for predictable results.
+
+Writing partitioned data
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+A partitioning object can be used to specify how your output data should be partitioned.
+This uses the same kind of partitioning objects we used for reading datasets.  To write
+our above data out to a partitioned directory we only need to specify how we want the
+dataset to be partitioned.  For example:
+
+.. ipython:: python
+
+    part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor="hive"
+    )
+    ds.write_dataset(table, dataset_root, format="parquet", partitioning=part)
+
+This will create two files.  Half our data will be in the dataset_root/c=1 directory and
+the other half will be in the dataset_root/c=2 directory.
+
+Writing large amounts of data
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The above examples wrote data from a table.  If you are writing a large amount of data
+you may not be able to load everything into a single in-memory table.  Fortunately, the
+write_dataset method also accepts an iterable of record batches.  This makes it really
+simple, for example, to repartition a large dataset without loading the entire dataset
+into memory:
+
+.. ipython:: python
+
+    old_part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor="hive"
+    )
+    new_part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor=None
+    )
+    input_dataset = ds.dataset(dataset_root, partitioning=old_part)
+    new_root = base / "repartitioned_dataset"
+    # A scanner can act as an iterator of record batches but you could also receive
+    # data from the network (e.g. via flight), from your own scanning, or from any
+    # other method that yields record batches.  In addition, you can pass a dataset
+    # into write_dataset directly but this method is useful if you want to customize
+    # the scanner (e.g. to filter the input dataset or set a maximum batch size)
+    scanner = input_dataset.scanner()
+
+    ds.write_dataset(scanner, new_root, format="parquet", partitioning=new_part)
+
+After the above example runs our data will be in dataset_root/1 and dataset_root/2
+directories.  You could also use this mechnaism to change which columns you are partitioned

Review comment:
       ```suggestion
   directories.  You could also use this mechnaism to change which columns the dataset is partitioned
   ```

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  This is
+useful if the dataset is small or there is only a small amount of data that needs to
+be read.  The dataset API contains additional methods to read and process large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  This
+method returns an iterator of record batches.  For example, we can use this method to
+calculate the average of a column without loading the entire column into memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+        col2_sum += pc.sum(batch.column('col2')).as_py()
+        count += batch.num_rows
+    mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and pyarrow
+uses an object called a :class:`Scanner` to do this.  A Scanner is created for you
+automatically by the to_table and to_batches method of the dataset.  Any arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`.  This controls the maximum size of the
+batches returned by the scanner.  Batches can still be smaller than the `batch_size`
+if the dataset consists of small files or those files themselves consist of small
+row groups.  For example, a parquet file with 10,000 rows per row group will yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller value.
+
+The default batch size is one million rows and this is typically a good default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset.  This can be useful when

Review comment:
       ```suggestion
   The dataset API also simplifies writing data to a dataset using :func:`write_dataset` .  This can be useful when
   ```
   
   (this is not necessarily the best addition, but basically I think it would be good to add this explicit reference _somewhere_ in this paragraph; it's useful to have this so that the user has an easy link to click to check the more detailed function docstring)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org