You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "grobgl (via GitHub)" <gi...@apache.org> on 2023/05/18 13:39:28 UTC

[GitHub] [iceberg] grobgl commented on issue #5800: Integrate pyiceberg with Dask

grobgl commented on issue #5800:
URL: https://github.com/apache/iceberg/issues/5800#issuecomment-1553073273

   I'm keen to push this forward. @TomAugspurger's implementation works in single-threaded mode but fails in a distributed scenario due to current lack of pickle support (I raised a separate issue #7644).
   
   Extending Tom's approach, this is a solution which utilises Dask's `from_map` and `DataFramIOFunction` which allows us to pass projected columns to the Parquet reader:
   ```python
   class IcebergFunctionWrapper(DataFrameIOFunction):
       def __init__(
           self,
           fs: FileSystem,
           bound_row_filter: BooleanExpression,
           projected_schema: Schema,
           case_sensitive: bool,
       ):
           self._fs = fs
           self._bound_row_filter = bound_row_filter
           self._projected_schema = projected_schema
           self._case_sensitive = case_sensitive
           self._projected_field_ids = {
               id for id in projected_schema.field_ids
               if not isinstance(projected_schema.find_type(id), (MapType, ListType))
           }.union(extract_field_ids(bound_row_filter))
           super().__init__()
       
       @property
       def columns(self) -> List[str]:
           self._projected_schema.column_names
   
       @property
       def empty_table(self) -> pd.DataFrame:
           return schema_to_pyarrow(self._projected_schema).empty_table().to_pandas(date_as_object=False)
   
       def project_columns(self, columns: Sequence[str]) -> 'IcebergFunctionWrapper':
           if list(columns) == self.columns:
               return self
   
           return IcebergFunctionWrapper(
               self._fs,
               self._bound_row_filter,
               self._projected_schema.select(*columns),
               self._case_sensitive,
           )
   
       def __call__(self, task: FileScanTask) -> pd.DataFrame:
           table = _file_to_table(
               self._fs,
               task,
               self._bound_row_filter,
               self._projected_schema,
               self._projected_field_ids,
               self._case_sensitive,
               0,  # no limit support yet
           )
   
           if table is None:
               return self.empty_table
   
           return table.to_pandas(date_as_object=False)
   
   
   def to_dask_dataframe(scan: DataScan) -> dd.DataFrame:
       tasks = scan.plan_files()
       table = scan.table
       row_filter = scan.row_filter
       projected_schema = scan.projection()
       case_sensitive = scan.case_sensitive
   
       scheme, _ = PyArrowFileIO.parse_location(table.location())
       if isinstance(table.io, PyArrowFileIO):
           fs = table.io.get_fs(scheme)
       else:
           try:
               from pyiceberg.io.fsspec import FsspecFileIO
   
               if isinstance(table.io, FsspecFileIO):
                   fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
               else:
                   raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}")
           except ModuleNotFoundError as e:
               # When FsSpec is not installed
               raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}") from e
   
       bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)
   
       io_func = IcebergFunctionWrapper(fs, bound_row_filter, projected_schema, case_sensitive)
   
       return dd.from_map(
           io_func,
           tasks,
           meta=io_func.empty_table,
           enforce_metadata=False,
       )
   ```
   I'm also looking into adding `divisions` support and row-group-level parallelisation to this.
   
   Generally, should this be part of the Dask library instead of PyIceberg?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org