You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "TomAugspurger (via GitHub)" <gi...@apache.org> on 2023/02/24 15:33:01 UTC
[GitHub] [iceberg] TomAugspurger commented on issue #5800: Integrate pyiceberg with Dask
TomAugspurger commented on issue #5800:
URL: https://github.com/apache/iceberg/issues/5800#issuecomment-1443857629
Here's a rough version of a `DataScane` to Dask DataFrame.
```python
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual
def _file_to_pandas(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive):
from pyiceberg.io.pyarrow import _file_to_table
return _file_to_table(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive).to_pandas()
def to_dask_dataframe(sc):
"""Convert a DataScane to a Dask DataFrame"""
from pyiceberg.io.pyarrow import (
PyArrowFileIO, bind, extract_field_ids, schema_to_pyarrow, MapType, ListType,
)
import pyarrow as pa
import dask
import dask.dataframe as dd
# arguments
tasks = list(sc.plan_files())
table = sc.table
row_filter = sc.row_filter
projected_schema = sc.projection()
case_sensitive = sc.case_sensitive
# stuff stolen from to_arrow()
if isinstance(table.io, PyArrowFileIO):
scheme, _ = PyArrowFileIO.parse_location(table.location())
fs = table.io.get_fs(scheme)
else:
raise ValueError(f"Expected PyArrowFileIO, got: {table.io}")
bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)
projected_field_ids = {
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
# build the Dask DataFrame
schema = schema_to_pyarrow(projected_schema)
names = [x.name for x in projected_schema.fields]
meta = pa.table([[]] * len(schema.names), schema=schema).to_pandas()
# TODO: ensure deterministic
token = dask.base.tokenize(fs, bound_row_filter, projected_schema, projected_field_ids, case_sensitive)
name = f'from-iceberg-{token}'
dsk = {
(name, i): (
_file_to_pandas, fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive
)
for i, task in enumerate(tasks)
}
divisions = [None] * len(dsk)
df = dd.DataFrame(dsk, name, meta, divisions)
return df
```
It seems to work, but I haven't tested it beyond a basic `df.head()`. A couple notes:
1. This returns a Dask DataFrame with a single Dask partition per "scan file" in `.scan_files()`. Which is maybe equal to the number of parquet files?
2. Currently, `divisions` is set to None, which is sub-optimal. (xref https://docs.dask.org/en/stable/dataframe-design.html?highlight=divisions#partitions, https://docs.dask.org/en/stable/dataframe-parquet.html?highlight=divisions#calculating-divisions). I'm seeing some stuff in `DataFile` that might help with setting those properly.
3. I've never used (py)iceberg before, but I was pleasantly surprised with how straightforward this was. This is copy-pasting bits and pieces out of `pyiceberg.io.pyarrow`. With a proper refactor, this is even fewer net new lines of code.
4. On the Dask side, we would be interested in making this more sophisticated, to allow operations on the Dask DataFrame to affect the original scan (https://github.com/dask/dask/issues/9970 and linked issues)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org