You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "jorisvandenbossche (via GitHub)" <gi...@apache.org> on 2023/05/12 05:56:26 UTC

[GitHub] [iceberg] jorisvandenbossche commented on a diff in pull request #6775: Python: Add positional deletes

jorisvandenbossche commented on code in PR #6775:
URL: https://github.com/apache/iceberg/pull/6775#discussion_r1191927724


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -498,6 +504,49 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+@lru_cache
+def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat:
+    if file_format == FileFormat.PARQUET:
+        return ds.ParquetFileFormat(**kwargs)
+    else:
+        raise ValueError(f"Unsupported file format: {file_format}")
+
+
+def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
+    _, path = PyArrowFileIO.parse_location(data_file.file_path)
+    return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)
+
+
+def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
+    delete_fragment = _construct_fragment(
+        fs, data_file, file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}
+    )
+    table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
+    table.unify_dictionaries()

Review Comment:
   ```suggestion
       table = table.unify_dictionaries()
   ```
   
   This method is not inplace, so you have to re-assign it



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -498,6 +504,49 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+@lru_cache
+def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat:
+    if file_format == FileFormat.PARQUET:
+        return ds.ParquetFileFormat(**kwargs)
+    else:
+        raise ValueError(f"Unsupported file format: {file_format}")
+
+
+def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
+    _, path = PyArrowFileIO.parse_location(data_file.file_path)
+    return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)
+
+
+def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
+    delete_fragment = _construct_fragment(
+        fs, data_file, file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}
+    )
+    table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
+    table.unify_dictionaries()
+    return {
+        file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
+        for file in table.column("file_path").chunks[0].dictionary
+    }
+
+
+def _create_positional_deletes_indices(positional_deletes: List[pa.ChunkedArray], fn_rows: Callable[[], int]) -> pa.Array:
+    sorted_deleted = merge(*positional_deletes)
+
+    def generator() -> Generator[int, None, None]:
+        deleted_pos = next(sorted_deleted).as_py()  # type: ignore
+        for pos in range(fn_rows()):
+            if deleted_pos == pos:
+                try:
+                    deleted_pos = next(sorted_deleted).as_py()  # type: ignore
+                except StopIteration:
+                    deleted_pos = -1
+            else:
+                yield pos
+
+    # Filter on the positions
+    return pa.array(generator(), type=pa.int64())

Review Comment:
   I haven't look closely and don't fully understand what this is doing, but I am wondering: is this doing something that you _should_ ideally be able to do with pyarrow functionality? (since it's doing some transformation from ChunkedArrays to a single Array)
   
   Is this essentially merging the chunked arrays + "inverting" the positions to delete into positions to keep?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org