You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/04 19:46:06 UTC

[GitHub] [iceberg] Fokko commented on a diff in pull request #6069: Python: TableScan Plan files API implementation without residual evaluation

Fokko commented on code in PR #6069:
URL: https://github.com/apache/iceberg/pull/6069#discussion_r1014404025


##########
python/pyiceberg/cli/output.py:
##########
@@ -49,6 +52,10 @@ def describe_table(self, table: Table) -> None:
     def files(self, table: Table, io: FileIO, history: bool) -> None:
         ...
 
+    @abstractmethod
+    def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:

Review Comment:
   ```suggestion
       def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: Optional[int] = None) -> None:
   ```



##########
python/pyiceberg/cli/output.py:
##########
@@ -182,7 +202,7 @@ def _out(self, d: Any) -> None:
     def exception(self, ex: Exception) -> None:
         self._out({"type": ex.__class__.__name__, "message": str(ex)})
 
-    def identifiers(self, identifiers: List[Identifier]) -> None:
+    def identifiers(self, identifiers: list[Identifier]) -> None:

Review Comment:
   ```suggestion
       def identifiers(self, identifiers: List[Identifier]) -> None:
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -90,3 +92,15 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
     def history(self) -> List[SnapshotLogEntry]:
         """Get the snapshot history of this table."""
         return self.metadata.snapshot_log
+
+    def new_scan(self, io: FileIO, snapshot_id: Optional[int] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE):

Review Comment:
   ```suggestion
       def new_scan(self, io: FileIO, snapshot_id: Optional[int] = None, expression:BooleanExpression = ALWAYS_TRUE):
   ```
   Since we default to `AlwaysTrue`, I think we can set this as required



##########
python/pyiceberg/cli/console.py:
##########
@@ -361,3 +361,24 @@ def table(ctx: Context, identifier: str, property_name: str):  # noqa: F811
         ctx.exit(1)
     else:
         raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")
+
+
+@run.group()
+def scan():
+    """Create a table scan."""
+
+
+@scan.command("table")
+@click.argument("identifier")
+@click.pass_context
+@catch_exception()
+def scan_table(ctx: Context, identifier: str):

Review Comment:
   Why not move it to the scan subcommand? This way we don't have similar commands



##########
python/pyiceberg/cli/output.py:
##########
@@ -66,7 +73,7 @@ def spec(self, spec: PartitionSpec) -> None:
         ...
 
     @abstractmethod
-    def uuid(self, uuid: Optional[UUID]) -> None:
+    def uuid(self, uuid: UUID | None) -> None:

Review Comment:
   ```suggestion
       def uuid(self, uuid: Optional[UUID]) -> None:
   ```



##########
python/pyiceberg/cli/output.py:
##########
@@ -146,6 +153,19 @@ def files(self, table: Table, io: FileIO, history: bool) -> None:
                     manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}")
         Console().print(snapshot_tree)
 
+    def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
+        snapshot_tree = Tree(f"Snapshot: {snapshot_id}")
+
+        manifest_dict = {}
+        for file in plan_files:
+            if file.manifest.manifest_path not in manifest_dict:

Review Comment:
   Nit: Should we move `file.manifest.manifest_path` in a local variable, since we use it three times?



##########
python/pyiceberg/expressions/visitors.py:
##########
@@ -436,8 +436,10 @@ def __init__(self, partition_struct_schema: Schema, partition_filter: BooleanExp
         self.partition_filter = bind(partition_struct_schema, rewrite_not(partition_filter), case_sensitive)
 
     def eval(self, manifest: ManifestFile) -> bool:
+        print(f"Evaluating ManifestFile = {manifest} with partition_filter={self.partition_filter}")

Review Comment:
   I think this is still around from the debugging? Maybe convert it into a log statement?



##########
python/pyiceberg/cli/output.py:
##########
@@ -38,7 +41,7 @@ def exception(self, ex: Exception) -> None:
         ...
 
     @abstractmethod
-    def identifiers(self, identifiers: List[Identifier]) -> None:
+    def identifiers(self, identifiers: list[Identifier]) -> None:

Review Comment:
   ```suggestion
       def identifiers(self, identifiers: List[Identifier]) -> None:
   ```



##########
python/pyiceberg/cli/output.py:
##########
@@ -88,7 +95,7 @@ def exception(self, ex: Exception) -> None:
         else:
             Console(stderr=True).print(ex)
 
-    def identifiers(self, identifiers: List[Identifier]) -> None:
+    def identifiers(self, identifiers: list[Identifier]) -> None:

Review Comment:
   ```suggestion
       def identifiers(self, identifiers: List[Identifier]) -> None:
   ```



##########
python/pyiceberg/cli/output.py:
##########
@@ -200,8 +220,11 @@ def schema(self, schema: Schema) -> None:
     def files(self, table: Table, io: FileIO, history: bool) -> None:
         pass
 
+    def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
+        pass
+
     def spec(self, spec: PartitionSpec) -> None:
         print(spec.json())
 
-    def uuid(self, uuid: Optional[UUID]) -> None:
+    def uuid(self, uuid: UUID | None) -> None:

Review Comment:
   ```suggestion
       def uuid(self, uuid: Optional[UUID]) -> None:
   ```



##########
python/pyiceberg/expressions/visitors.py:
##########
@@ -613,6 +615,9 @@ def visit_or(self, left_result: bool, right_result: bool) -> bool:
 def manifest_evaluator(
     partition_spec: PartitionSpec, schema: Schema, partition_filter: BooleanExpression, case_sensitive: bool = True
 ) -> Callable[[ManifestFile], bool]:
-    partition_schema = Schema(*partition_spec.partition_type(schema))
+    partition_type = partition_spec.partition_type(schema)
+    partition_schema = Schema(*partition_type.fields)
+    partition_filter = partition_filter or AlwaysTrue()

Review Comment:
   Since `partition_filter` cannot be `None` according to the type, this statement doesn't have any effect



##########
python/pyiceberg/cli/output.py:
##########
@@ -164,7 +184,7 @@ def schema(self, schema: Schema) -> None:
     def spec(self, spec: PartitionSpec) -> None:
         Console().print(str(spec))
 
-    def uuid(self, uuid: Optional[UUID]) -> None:
+    def uuid(self, uuid: UUID | None) -> None:

Review Comment:
   ```suggestion
       def uuid(self, uuid: Optional[UUID]) -> None:
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -90,3 +92,15 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
     def history(self) -> List[SnapshotLogEntry]:
         """Get the snapshot history of this table."""
         return self.metadata.snapshot_log
+
+    def new_scan(self, io: FileIO, snapshot_id: Optional[int] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE):
+        """Create a new scan for this table."""
+        from pyiceberg.table.scan import DataTableScan
+
+        if not (use_snapshot := snapshot_id or self.metadata.current_snapshot_id):
+            raise ValueError("Unable to resolve a snapshot to use for this scan.")
+
+        if not (snapshot := self.snapshot_by_id(use_snapshot)):

Review Comment:
   Same thing here, the walrus operator doesn't add anything here
   ```suggestion
           if not self.snapshot_by_id(use_snapshot):
   ```



##########
python/pyiceberg/expressions/visitors.py:
##########
@@ -517,7 +519,7 @@ def visit_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
         pos = term.ref().accessor.position
         field = self.partition_fields[pos]
 
-        if field.lower_bound is None:
+        if field.lower_bound is None or field.upper_bound is None:

Review Comment:
   Great catch! Thanks!



##########
python/pyiceberg/table/scan.py:
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+from abc import ABC, abstractmethod
+from typing import Iterable, List, Optional
+
+from pydantic import Field
+
+from pyiceberg.expressions import AlwaysTrue, BooleanExpression
+from pyiceberg.expressions.visitors import manifest_evaluator
+from pyiceberg.io import FileIO
+from pyiceberg.manifest import DataFile, ManifestFile
+from pyiceberg.table import PartitionSpec, Snapshot, Table
+from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+
+ALWAYS_TRUE = AlwaysTrue()
+
+
+class FileScanTask(IcebergBaseModel):
+    """A scan task over a range of bytes in a single data file."""
+
+    manifest: ManifestFile = Field()
+    data_file: DataFile = Field()
+    _residual: BooleanExpression = Field()
+    spec: PartitionSpec = Field()
+    start: int = Field(default=0)
+
+    @property
+    def length(self) -> int:
+        return self.data_file.file_size_in_bytes
+
+
+class TableScan(ABC):
+    """API for configuring a table scan."""
+
+    table: Table
+    snapshot: Snapshot
+    expression: BooleanExpression
+
+    def __init__(self, table: Table, snapshot: Optional[Snapshot] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE):
+        self.table = table
+        self.expression = expression or ALWAYS_TRUE
+        if resolved_snapshot := snapshot or table.current_snapshot():
+            self.snapshot = resolved_snapshot
+        else:
+            raise ValueError("Unable to resolve to a Snapshot to use for the table scan.")
+
+    @abstractmethod
+    def plan_files(self) -> Iterable[FileScanTask]:
+        """Plan tasks for this scan where each task reads a single file.
+
+        Returns:
+            Table: a tuple of tasks scanning entire files required by this scan
+        """
+
+
+class DataTableScan(TableScan):
+    """API for configuring a table scan."""
+
+    io: FileIO
+
+    def __init__(
+        self, io: FileIO, table: Table, snapshot: Optional[Snapshot] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE
+    ):
+        self.io = io
+        super().__init__(table, snapshot, expression)

Review Comment:
   Nit: Convention is to first call the super, and then do the rest



##########
python/pyiceberg/table/scan.py:
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+from abc import ABC, abstractmethod
+from typing import Iterable, List, Optional
+
+from pydantic import Field
+
+from pyiceberg.expressions import AlwaysTrue, BooleanExpression
+from pyiceberg.expressions.visitors import manifest_evaluator
+from pyiceberg.io import FileIO
+from pyiceberg.manifest import DataFile, ManifestFile
+from pyiceberg.table import PartitionSpec, Snapshot, Table
+from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+
+ALWAYS_TRUE = AlwaysTrue()
+
+
+class FileScanTask(IcebergBaseModel):
+    """A scan task over a range of bytes in a single data file."""
+
+    manifest: ManifestFile = Field()
+    data_file: DataFile = Field()
+    _residual: BooleanExpression = Field()
+    spec: PartitionSpec = Field()
+    start: int = Field(default=0)
+
+    @property
+    def length(self) -> int:
+        return self.data_file.file_size_in_bytes
+
+
+class TableScan(ABC):
+    """API for configuring a table scan."""
+
+    table: Table
+    snapshot: Snapshot
+    expression: BooleanExpression
+
+    def __init__(self, table: Table, snapshot: Optional[Snapshot] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE):

Review Comment:
   Since we provide a default here, maybe we should just make it mandatory
   ```suggestion
       def __init__(self, table: Table, snapshot: Optional[Snapshot] = None, expression: BooleanExpression = ALWAYS_TRUE):
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -90,3 +92,15 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
     def history(self) -> List[SnapshotLogEntry]:
         """Get the snapshot history of this table."""
         return self.metadata.snapshot_log
+
+    def new_scan(self, io: FileIO, snapshot_id: Optional[int] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE):
+        """Create a new scan for this table."""
+        from pyiceberg.table.scan import DataTableScan
+
+        if not (use_snapshot := snapshot_id or self.metadata.current_snapshot_id):

Review Comment:
   The walrus operator doesn't add any value here, I would suggest removing it
   ```suggestion
           if not snapshot_id and not self.metadata.current_snapshot_id:
   ```



##########
python/tests/table/test_scan.py:
##########
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+import glob
+import json
+import os
+import tempfile
+from typing import Tuple
+
+import pytest
+from fastavro import reader, writer
+
+from pyiceberg.expressions import (
+    BooleanExpression,
+    EqualTo,
+    IsNull,
+    Reference,
+)
+from pyiceberg.expressions.literals import literal
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.table import Table
+from tests.conftest import LocalFileIO
+from tests.io.test_io import LocalInputFile
+
+
+@pytest.fixture
+def temperatures_table() -> Table:

Review Comment:
   I'm hesitant to keep actual binaries in the repository. We already generate the manifest list and manifest itself: https://github.com/apache/iceberg/blob/b215c4861a0c967f8d2a2b0444697d7877d33992/python/tests/conftest.py#L952-L967 How would you feel about generating some Parquet files too?



##########
python/pyiceberg/cli/output.py:
##########
@@ -14,9 +14,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import annotations

Review Comment:
   It should have been fixed now https://github.com/apache/iceberg/pull/6114 is in. Delayed loading of the annotations (that's what the import does), can be helpful sometime :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org