You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/20 23:01:38 UTC

[GitHub] [iceberg] rdblue commented on a diff in pull request #6233: Python: Implement DataScan.plan_files

rdblue commented on code in PR #6233:
URL: https://github.com/apache/iceberg/pull/6233#discussion_r1027369463


##########
python/pyiceberg/table/__init__.py:
##########
@@ -199,16 +223,114 @@ def use_ref(self, name: str):
 
         raise ValueError(f"Cannot scan unknown ref={name}")
 
-    def select(self, *field_names: str) -> TableScan:
+    def select(self, *field_names: str) -> S:
         if "*" in self.selected_fields:
             return self.update(selected_fields=field_names)
         return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))
 
-    def filter_rows(self, new_row_filter: BooleanExpression) -> TableScan:
+    def filter_rows(self, new_row_filter: BooleanExpression) -> S:
         return self.update(row_filter=And(self.row_filter, new_row_filter))
 
-    def filter_partitions(self, new_partition_filter: BooleanExpression) -> TableScan:
+    def filter_partitions(self, new_partition_filter: BooleanExpression) -> S:
         return self.update(partition_filter=And(self.partition_filter, new_partition_filter))
 
-    def with_case_sensitive(self, case_sensitive: bool = True) -> TableScan:
+    def with_case_sensitive(self, case_sensitive: bool = True) -> S:
         return self.update(case_sensitive=case_sensitive)
+
+
+class ScanTask(ABC):
+    pass
+
+
+@dataclass(init=False)
+class FileScanTask(ScanTask):
+    data_file: DataFile
+    start: int
+    length: int
+
+    def __init__(self, data_file: DataFile, start: Optional[int] = None, length: Optional[int] = None):
+        self.data_file = data_file
+        self.start = start or 0
+        self.length = length or data_file.file_size_in_bytes
+
+
+class _DictAsStruct(StructProtocol):
+    pos_to_name: dict[int, str]
+    wrapped: dict[str, Any]
+
+    def __init__(self, partition_type: StructType):
+        self.pos_to_name = {}
+        for pos, field in enumerate(partition_type.fields):
+            self.pos_to_name[pos] = field.name
+
+    def wrap(self, to_wrap: dict[str, Any]) -> _DictAsStruct:
+        self.wrapped = to_wrap
+        return self
+
+    def get(self, pos: int) -> Any:
+        return self.wrapped[self.pos_to_name[pos]]
+
+    def set(self, pos: int, value: Any) -> None:
+        raise NotImplementedError("Cannot set values in DictAsStruct")
+
+
+class DataScan(TableScan["DataScan"]):
+    def __init__(
+        self,
+        table: Table,
+        row_filter: Optional[BooleanExpression] = None,
+        partition_filter: Optional[BooleanExpression] = None,
+        selected_fields: Tuple[str] = ("*",),
+        case_sensitive: bool = True,
+        snapshot_id: Optional[int] = None,
+        options: Properties = EMPTY_DICT,
+    ):
+        super().__init__(table, row_filter, partition_filter, selected_fields, case_sensitive, snapshot_id, options)
+
+    def plan_files(self) -> Iterator[ScanTask]:

Review Comment:
   This still needs tests, but I verified that it correctly filters manifests and data files within manifests by the partition filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org