You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "grobgl (via GitHub)" <gi...@apache.org> on 2023/05/18 13:39:28 UTC
[GitHub] [iceberg] grobgl commented on issue #5800: Integrate pyiceberg with Dask
grobgl commented on issue #5800:
URL: https://github.com/apache/iceberg/issues/5800#issuecomment-1553073273
I'm keen to push this forward. @TomAugspurger's implementation works in single-threaded mode but fails in a distributed scenario due to current lack of pickle support (I raised a separate issue #7644).
Extending Tom's approach, this is a solution which utilises Dask's `from_map` and `DataFramIOFunction` which allows us to pass projected columns to the Parquet reader:
```python
class IcebergFunctionWrapper(DataFrameIOFunction):
def __init__(
self,
fs: FileSystem,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
case_sensitive: bool,
):
self._fs = fs
self._bound_row_filter = bound_row_filter
self._projected_schema = projected_schema
self._case_sensitive = case_sensitive
self._projected_field_ids = {
id for id in projected_schema.field_ids
if not isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
super().__init__()
@property
def columns(self) -> List[str]:
self._projected_schema.column_names
@property
def empty_table(self) -> pd.DataFrame:
return schema_to_pyarrow(self._projected_schema).empty_table().to_pandas(date_as_object=False)
def project_columns(self, columns: Sequence[str]) -> 'IcebergFunctionWrapper':
if list(columns) == self.columns:
return self
return IcebergFunctionWrapper(
self._fs,
self._bound_row_filter,
self._projected_schema.select(*columns),
self._case_sensitive,
)
def __call__(self, task: FileScanTask) -> pd.DataFrame:
table = _file_to_table(
self._fs,
task,
self._bound_row_filter,
self._projected_schema,
self._projected_field_ids,
self._case_sensitive,
0, # no limit support yet
)
if table is None:
return self.empty_table
return table.to_pandas(date_as_object=False)
def to_dask_dataframe(scan: DataScan) -> dd.DataFrame:
tasks = scan.plan_files()
table = scan.table
row_filter = scan.row_filter
projected_schema = scan.projection()
case_sensitive = scan.case_sensitive
scheme, _ = PyArrowFileIO.parse_location(table.location())
if isinstance(table.io, PyArrowFileIO):
fs = table.io.get_fs(scheme)
else:
try:
from pyiceberg.io.fsspec import FsspecFileIO
if isinstance(table.io, FsspecFileIO):
fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
else:
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}")
except ModuleNotFoundError as e:
# When FsSpec is not installed
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}") from e
bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)
io_func = IcebergFunctionWrapper(fs, bound_row_filter, projected_schema, case_sensitive)
return dd.from_map(
io_func,
tasks,
meta=io_func.empty_table,
enforce_metadata=False,
)
```
I'm also looking into adding `divisions` support and row-group-level parallelisation to this.
Generally, should this be part of the Dask library instead of PyIceberg?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org