You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "TomAugspurger (via GitHub)" <gi...@apache.org> on 2023/02/24 15:33:01 UTC

[GitHub] [iceberg] TomAugspurger commented on issue #5800: Integrate pyiceberg with Dask

TomAugspurger commented on issue #5800:
URL: https://github.com/apache/iceberg/issues/5800#issuecomment-1443857629

   Here's a rough version of a `DataScane` to Dask DataFrame.
   
   ```python
   from pyiceberg.catalog import load_catalog
   from pyiceberg.expressions import GreaterThanOrEqual
   
   def _file_to_pandas(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive):
       from pyiceberg.io.pyarrow import _file_to_table
   
       return _file_to_table(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive).to_pandas()
   
   
   def to_dask_dataframe(sc):
       """Convert a DataScane to a Dask DataFrame"""
       from pyiceberg.io.pyarrow import (
           PyArrowFileIO, bind, extract_field_ids, schema_to_pyarrow, MapType, ListType,
       )
       import pyarrow as pa
       import dask
       import dask.dataframe as dd    
   
       # arguments
       tasks = list(sc.plan_files())
       table = sc.table
       row_filter = sc.row_filter
       projected_schema = sc.projection()
       case_sensitive = sc.case_sensitive
   
       # stuff stolen from to_arrow()
       if isinstance(table.io, PyArrowFileIO):
           scheme, _ = PyArrowFileIO.parse_location(table.location())
           fs = table.io.get_fs(scheme)
       else:
           raise ValueError(f"Expected PyArrowFileIO, got: {table.io}")
   
       bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)
   
       projected_field_ids = {
           id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
       }.union(extract_field_ids(bound_row_filter))
   
       # build the Dask DataFrame
       schema = schema_to_pyarrow(projected_schema)
       names = [x.name for x in projected_schema.fields]
       meta = pa.table([[]] * len(schema.names), schema=schema).to_pandas()
       # TODO: ensure deterministic
       token = dask.base.tokenize(fs, bound_row_filter, projected_schema, projected_field_ids, case_sensitive)
       name = f'from-iceberg-{token}'
   
       dsk = {
           (name, i): (
               _file_to_pandas, fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive
           )
           for i, task in enumerate(tasks)
       }
       divisions = [None] * len(dsk)
       df = dd.DataFrame(dsk, name, meta, divisions)
   
       return df
   ```
   
   It seems to work, but I haven't tested it beyond a basic `df.head()`. A couple notes:
   
   1. This returns a Dask DataFrame with a single Dask partition per "scan file" in `.scan_files()`. Which is maybe equal to the number of parquet files?
   2. Currently, `divisions` is set to None, which is sub-optimal. (xref https://docs.dask.org/en/stable/dataframe-design.html?highlight=divisions#partitions, https://docs.dask.org/en/stable/dataframe-parquet.html?highlight=divisions#calculating-divisions). I'm seeing some stuff in `DataFile` that might help with setting those properly.
   3. I've never used (py)iceberg before, but I was pleasantly surprised with how straightforward this was. This is copy-pasting bits and pieces out of `pyiceberg.io.pyarrow`. With a proper refactor, this is even fewer net new lines of code.
   4. On the Dask side, we would be interested in making this more sophisticated, to allow operations on the Dask DataFrame to affect the original scan (https://github.com/dask/dask/issues/9970 and linked issues)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org