You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/13 18:16:29 UTC

[GitHub] [iceberg] Fokko commented on a diff in pull request #4717: Python: Add PartitionSpec

Fokko commented on code in PR #4717:
URL: https://github.com/apache/iceberg/pull/4717#discussion_r872655581


##########
python/src/iceberg/table/partitioning.py:
##########
@@ -64,3 +69,95 @@ def __str__(self):
 
     def __repr__(self):
         return f"PartitionField(field_id={self.field_id}, name={self.name}, transform={repr(self.transform)}, source_id={self.source_id})"
+
+    def __hash__(self):
+        return hash((self.source_id, self.field_id, self.name, self.transform))
+
+
+class PartitionSpec:
+    """
+    PartitionSpec capture the transformation from table data to partition values
+
+    Attributes:
+        schema(Schema): the schema of data table
+        spec_id(int): any change to PartitionSpec will produce a new specId
+        fields(List[PartitionField): list of partition fields to produce partition values
+        last_assigned_field_id(int): auto-increment partition field id starting from PARTITION_DATA_ID_START
+    """
+
+    def __init__(self, schema: Schema, spec_id: int, fields: Iterable[PartitionField], last_assigned_field_id: int):
+        self._schema = schema
+        self._spec_id = spec_id
+        self._fields = tuple(fields)
+        self._last_assigned_field_id = last_assigned_field_id
+        # derived
+        self._fields_by_source_id: Dict[int, List[PartitionField]] = {}
+
+    @property
+    def schema(self) -> Schema:
+        return self._schema
+
+    @property
+    def spec_id(self) -> int:
+        return self._spec_id
+
+    @property
+    def fields(self) -> Tuple[PartitionField, ...]:
+        return self._fields
+
+    @property
+    def last_assigned_field_id(self) -> int:
+        return self._last_assigned_field_id
+
+    def __eq__(self, other):
+        return self.spec_id == other.spec_id and self.fields == other.fields
+
+    def __str__(self):
+        if self.is_unpartitioned():
+            return "[]"
+        else:
+            delimiter = "\n  "
+            partition_fields_in_str = (str(partition_field) for partition_field in self.fields)
+            head = f"[{delimiter}"
+            tail = f"\n]"
+            return f"{head}{delimiter.join(partition_fields_in_str)}{tail}"
+
+    def __repr__(self):
+        return f"PartitionSpec: {str(self)}"
+
+    def __hash__(self):
+        return hash((self.spec_id, self.fields))
+
+    def is_unpartitioned(self) -> bool:
+        return len(self.fields) < 1
+
+    def fields_by_source_id(self, field_id: int) -> List[PartitionField]:
+        if not self._fields_by_source_id:
+            for partition_field in self.fields:
+                source_column = self.schema.find_column_name(partition_field.source_id)
+                if not source_column:
+                    raise ValueError(f"Cannot find source column: {partition_field.source_id}")
+                existing = self._fields_by_source_id.get(partition_field.source_id, [])
+                existing.append(partition_field)
+                self._fields_by_source_id[partition_field.source_id] = existing
+        return self._fields_by_source_id[field_id]
+
+    def compatible_with(self, other) -> bool:

Review Comment:
   ```suggestion
       def compatible_with(self, other: "PartitionSpec") -> bool:
   ```



##########
python/src/iceberg/table/partitioning.py:
##########
@@ -14,8 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from typing import Dict, Iterable, List, Tuple
+
+from iceberg.schema import Schema
 from iceberg.transforms import Transform
 
+_PARTITION_DATA_ID_START: int = 1000
+
 
 class PartitionField:

Review Comment:
   Instead of implementing the `__eq__` and `__hash__` we could leverage the dataclass library. If we set `eq=True` and `frozen=True` (which makes it immutable, which is also nice), then we get __hash__ automatically:
   
   If `eq` and `frozen` are both true, by default [dataclass()](https://docs.python.org/3/library/dataclasses.html#dataclasses.dataclass) will generate a `__hash__()` method for you. If eq is true and frozen is false, `__hash__()` will be set to `None`, marking it unhashable (which it is, since it is mutable). If `eq` is false, `__hash__()` will be left untouched meaning the `__hash__()` method of the superclass will be used (if the superclass is [object](https://docs.python.org/3/library/functions.html#object), this means it will fall back to id-based hashing).
   
   More information here: https://docs.python.org/3/library/dataclasses.html



##########
python/src/iceberg/table/partitioning.py:
##########
@@ -64,3 +69,95 @@ def __str__(self):
 
     def __repr__(self):
         return f"PartitionField(field_id={self.field_id}, name={self.name}, transform={repr(self.transform)}, source_id={self.source_id})"
+
+    def __hash__(self):
+        return hash((self.source_id, self.field_id, self.name, self.transform))
+
+
+class PartitionSpec:
+    """
+    PartitionSpec capture the transformation from table data to partition values
+
+    Attributes:
+        schema(Schema): the schema of data table
+        spec_id(int): any change to PartitionSpec will produce a new specId
+        fields(List[PartitionField): list of partition fields to produce partition values
+        last_assigned_field_id(int): auto-increment partition field id starting from PARTITION_DATA_ID_START
+    """
+
+    def __init__(self, schema: Schema, spec_id: int, fields: Iterable[PartitionField], last_assigned_field_id: int):
+        self._schema = schema
+        self._spec_id = spec_id
+        self._fields = tuple(fields)
+        self._last_assigned_field_id = last_assigned_field_id
+        # derived
+        self._fields_by_source_id: Dict[int, List[PartitionField]] = {}
+
+    @property
+    def schema(self) -> Schema:
+        return self._schema
+
+    @property
+    def spec_id(self) -> int:
+        return self._spec_id
+
+    @property
+    def fields(self) -> Tuple[PartitionField, ...]:
+        return self._fields
+
+    @property
+    def last_assigned_field_id(self) -> int:
+        return self._last_assigned_field_id
+
+    def __eq__(self, other):
+        return self.spec_id == other.spec_id and self.fields == other.fields
+
+    def __str__(self):
+        if self.is_unpartitioned():
+            return "[]"
+        else:
+            delimiter = "\n  "
+            partition_fields_in_str = (str(partition_field) for partition_field in self.fields)
+            head = f"[{delimiter}"
+            tail = f"\n]"
+            return f"{head}{delimiter.join(partition_fields_in_str)}{tail}"
+
+    def __repr__(self):
+        return f"PartitionSpec: {str(self)}"
+
+    def __hash__(self):
+        return hash((self.spec_id, self.fields))
+
+    def is_unpartitioned(self) -> bool:
+        return len(self.fields) < 1
+
+    def fields_by_source_id(self, field_id: int) -> List[PartitionField]:
+        if not self._fields_by_source_id:
+            for partition_field in self.fields:
+                source_column = self.schema.find_column_name(partition_field.source_id)
+                if not source_column:
+                    raise ValueError(f"Cannot find source column: {partition_field.source_id}")
+                existing = self._fields_by_source_id.get(partition_field.source_id, [])
+                existing.append(partition_field)
+                self._fields_by_source_id[partition_field.source_id] = existing
+        return self._fields_by_source_id[field_id]
+
+    def compatible_with(self, other) -> bool:
+        """
+        Returns true if this partition spec is equivalent to the other, with partition field_id ignored.
+        That is, if both specs have the same number of fields, field order, field name, source column ids, and transforms.
+        """
+        if self == other:
+            return True
+        if len(self.fields) != len(other.fields):
+            return False
+        for index in range(len(self.fields)):

Review Comment:
   I would write this as:
   ```python
   return all(
       this_field.source_id == that_field.source_id and this_field.transform == that_field.transform and this_field.name == that_field.name
       for this_field, that_field
       in zip(self.fields], other.fields)
   )
   ```
   Or rely on the `eq` method :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org